Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-876] Support schemaUpdateOption in BigQueryIO #9524

Merged
merged 5 commits into from
Nov 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.services.bigquery.model.TableRow;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -36,6 +38,7 @@
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -115,6 +118,7 @@ class BatchLoads<DestinationT, ElementT>
private BigQueryServices bigQueryServices;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private Set<SchemaUpdateOption> schemaUpdateOptions;
private final boolean ignoreUnknownValues;
// Indicates that we are writing to a constant single table. If this is the case, we will create
// the table, even if there is no data in it.
Expand Down Expand Up @@ -166,6 +170,11 @@ class BatchLoads<DestinationT, ElementT>
this.elementCoder = elementCoder;
this.kmsKey = kmsKey;
this.rowWriterFactory = rowWriterFactory;
schemaUpdateOptions = Collections.emptySet();
}

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
this.schemaUpdateOptions = schemaUpdateOptions;
}

void setTestServices(BigQueryServices bigQueryServices) {
Expand Down Expand Up @@ -587,7 +596,8 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat()));
rowWriterFactory.getSourceFormat(),
schemaUpdateOptions));
}

// In the case where the files fit into a single load job, there's no need to write temporary
Expand Down Expand Up @@ -621,7 +631,8 @@ void writeSinglePartition(
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat()));
rowWriterFactory.getSourceFormat(),
schemaUpdateOptions));
}

private WriteResult writeResult(Pipeline p) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -1635,6 +1637,7 @@ public static <T> Write<T> write() {
.setBigQueryServices(new BigQueryServicesImpl())
.setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
.setSchemaUpdateOptions(Collections.emptySet())
.setNumFileShards(0)
.setMethod(Write.Method.DEFAULT)
.setExtendedErrorInfo(false)
Expand Down Expand Up @@ -1729,6 +1732,8 @@ public enum Method {
abstract CreateDisposition getCreateDisposition();

abstract WriteDisposition getWriteDisposition();

abstract Set<SchemaUpdateOption> getSchemaUpdateOptions();
/** Table description. Default is empty. */
@Nullable
abstract String getTableDescription();
Expand Down Expand Up @@ -1807,6 +1812,8 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);

abstract Builder<T> setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions);

abstract Builder<T> setTableDescription(String tableDescription);

abstract Builder<T> setValidate(boolean validate);
Expand Down Expand Up @@ -1910,6 +1917,25 @@ public enum WriteDisposition {
WRITE_EMPTY
}

/**
* An enumeration type for the BigQuery schema update options strings.
*
* <p>Note from the BigQuery API doc -- Schema update options are supported in two cases: when
* writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination
* table is a partition of a table, specified by partition decorators.
*
* @see <a
* href="https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery">
* <code>configuration.query.schemaUpdateOptions</code> in the BigQuery Jobs API</a>
*/
public enum SchemaUpdateOption {
/** Allow adding a nullable field to the schema. */
ALLOW_FIELD_ADDITION,

/** Allow relaxing a required field in the original schema to nullable. */
ALLOW_FIELD_RELAXATION
}

/**
* Writes to the given table, specified in the format described in {@link
* BigQueryHelpers#parseTableSpec}.
Expand Down Expand Up @@ -2098,6 +2124,12 @@ public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
return toBuilder().setWriteDisposition(writeDisposition).build();
}

/** Allows the schema of the destination table to be updated as a side effect of the write. */
public Write<T> withSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
checkArgument(schemaUpdateOptions != null, "schemaUpdateOptions can not be null");
return toBuilder().setSchemaUpdateOptions(schemaUpdateOptions).build();
}

/** Specifies the table description. */
public Write<T> withTableDescription(String tableDescription) {
checkArgument(tableDescription != null, "tableDescription can not be null");
Expand Down Expand Up @@ -2589,6 +2621,9 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
rowWriterFactory,
getKmsKey());
batchLoads.setTestServices(getBigQueryServices());
if (getSchemaUpdateOptions() != null) {
batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions());
}
if (getMaxFilesPerBundle() != null) {
batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
}
Expand Down Expand Up @@ -2634,6 +2669,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add(
DisplayData.item("writeDisposition", getWriteDisposition().toString())
.withLabel("Table WriteDisposition"))
.add(
DisplayData.item("schemaUpdateOptions", getSchemaUpdateOptions().toString())
.withLabel("Table SchemaUpdateOptions"))
.addIfNotDefault(
DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true)
.addIfNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -38,6 +40,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJob;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJobManager;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
Expand Down Expand Up @@ -90,6 +93,7 @@ class WriteTables<DestinationT>
private final PCollectionView<String> loadJobIdPrefixView;
private final WriteDisposition firstPaneWriteDisposition;
private final CreateDisposition firstPaneCreateDisposition;
private final Set<SchemaUpdateOption> schemaUpdateOptions;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<KV<TableDestination, String>> mainOutputTag;
Expand Down Expand Up @@ -219,7 +223,8 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
tableSchema,
partitionFiles,
writeDisposition,
createDisposition);
createDisposition,
schemaUpdateOptions);
pendingJobs.add(
new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference));
}
Expand Down Expand Up @@ -288,7 +293,9 @@ public WriteTables(
int maxRetryJobs,
boolean ignoreUnknownValues,
String kmsKey,
String sourceFormat) {
String sourceFormat,
Set<SchemaUpdateOption> schemaUpdateOptions) {

this.tempTable = tempTable;
this.bqServices = bqServices;
this.loadJobIdPrefixView = loadJobIdPrefixView;
Expand All @@ -303,6 +310,7 @@ public WriteTables(
this.ignoreUnknownValues = ignoreUnknownValues;
this.kmsKey = kmsKey;
this.sourceFormat = sourceFormat;
this.schemaUpdateOptions = schemaUpdateOptions;
}

@Override
Expand Down Expand Up @@ -346,7 +354,8 @@ private PendingJob startLoad(
@Nullable TableSchema schema,
List<String> gcsUris,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) {
CreateDisposition createDisposition,
Set<SchemaUpdateOption> schemaUpdateOptions) {
JobConfigurationLoad loadConfig =
new JobConfigurationLoad()
.setDestinationTable(ref)
Expand All @@ -356,6 +365,11 @@ private PendingJob startLoad(
.setCreateDisposition(createDisposition.name())
.setSourceFormat(sourceFormat)
.setIgnoreUnknownValues(ignoreUnknownValues);
if (schemaUpdateOptions != null) {
List<String> options =
schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
loadConfig.setSchemaUpdateOptions(options);
}
if (timePartitioning != null) {
loadConfig.setTimePartitioning(timePartitioning);
// only set clustering if timePartitioning is set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
Expand Down Expand Up @@ -264,6 +266,12 @@ public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query,
throw new UnsupportedOperationException();
}

public Collection<Job> getAllJobs() {
synchronized (allJobs) {
return allJobs.values().stream().map(j -> j.job).collect(Collectors.toList());
}
}

@Override
public Job getJob(JobReference jobRef) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableFieldSchema;
Expand All @@ -54,11 +56,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AtomicCoder;
Expand All @@ -68,6 +73,7 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
Expand Down Expand Up @@ -1179,6 +1185,8 @@ public void testBuildWriteDisplayData() {
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(
EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withTableDescription(tblDescription)
.withoutValidation();

Expand All @@ -1194,6 +1202,11 @@ public void testBuildWriteDisplayData() {
displayData,
hasDisplayItem(
"writeDisposition", BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString()));
assertThat(
displayData,
hasDisplayItem(
"schemaUpdateOptions",
EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION).toString()));
assertThat(displayData, hasDisplayItem("tableDescription", tblDescription));
assertThat(displayData, hasDisplayItem("validation", false));
}
Expand Down Expand Up @@ -1571,7 +1584,8 @@ public void testWriteTables() throws Exception {
4,
false,
null,
"NEWLINE_DELIMITED_JSON");
"NEWLINE_DELIMITED_JSON",
Collections.emptySet());

PCollection<KV<TableDestination, String>> writeTablesOutput =
writeTablesInput.apply(writeTables);
Expand Down Expand Up @@ -1854,4 +1868,59 @@ public void testWrongErrorConfigs() {
+ "uses extended errors information. Use getFailedInsertsWithErr instead"));
}
}

void schemaUpdateOptionsTest(
BigQueryIO.Write.Method insertMethod, Set<SchemaUpdateOption> schemaUpdateOptions)
throws Exception {
TableRow row = new TableRow().set("date", "2019-01-01").set("number", "1");

TableSchema schema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("date")
.setType("DATE")
.setName("number")
.setType("INTEGER")));

Write<TableRow> writeTransform =
BigQueryIO.writeTableRows()
.to("project-id:dataset-id.table-id")
.withTestServices(fakeBqServices)
.withMethod(insertMethod)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(schemaUpdateOptions);

p.apply(Create.<TableRow>of(row)).apply(writeTransform);
p.run();

List<String> expectedOptions =
schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());

for (Job job : fakeJobService.getAllJobs()) {
JobConfigurationLoad configuration = job.getConfiguration().getLoad();
assertEquals(expectedOptions, configuration.getSchemaUpdateOptions());
}
}

@Test
public void testWriteFileSchemaUpdateOptionAllowFieldAddition() throws Exception {
Set<SchemaUpdateOption> options = EnumSet.of(SchemaUpdateOption.ALLOW_FIELD_ADDITION);
schemaUpdateOptionsTest(BigQueryIO.Write.Method.FILE_LOADS, options);
}

@Test
public void testWriteFileSchemaUpdateOptionAllowFieldRelaxation() throws Exception {
Set<SchemaUpdateOption> options = EnumSet.of(SchemaUpdateOption.ALLOW_FIELD_RELAXATION);
schemaUpdateOptionsTest(BigQueryIO.Write.Method.FILE_LOADS, options);
}

@Test
public void testWriteFileSchemaUpdateOptionAll() throws Exception {
Set<SchemaUpdateOption> options = EnumSet.allOf(SchemaUpdateOption.class);
schemaUpdateOptionsTest(BigQueryIO.Write.Method.FILE_LOADS, options);
}
}
Loading