Skip to content

Commit

Permalink
feat: Update WriteToDefaultStream sample to match best practices
Browse files Browse the repository at this point in the history
  • Loading branch information
gnanda committed Apr 28, 2022
1 parent 5d4c7e1 commit 6d4b3d0
Showing 1 changed file with 156 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@
package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_default]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;

Expand All @@ -45,36 +55,154 @@ public static void runWriteToDefaultStream()

public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(datasetName, tableName);
TableName parentTable = TableName.of(projectId, datasetName, tableName);
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
// Write two batches to the stream, each with 10 JSON records. A writer should be used for as
// much writes as possible. Creating a writer for just one write is an antipattern.
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("test_string", String.format("record %03d-%03d", i, j));
jsonArr.put(record);

DataWriter writer = new DataWriter();
// One time initialization for the worker.
writer.initialize(parentTable);

// Write two batches of fake data to the stream, each with 10 JSON records. Data may be
// batched up to the maximum request size:
// https://cloud.google.com/bigquery/quotas#write-api-limits
for (int i = 0; i < 2; i++) {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("test_string", String.format("record %03d-%03d", i, j));
jsonArr.put(record);
}

writer.append(new AppendContext(jsonArr, 0));
}

// Final cleanup for the stream during worker teardown.
writer.cleanup();
System.out.println("Appended records successfully.");
}

private static class AppendContext {

JSONArray data;
int retryCount = 0;

AppendContext(JSONArray data, int retryCount) {
this.data = data;
this.retryCount = retryCount;
}
}

private static class DataWriter {

private static final int MAX_RETRY_COUNT = 2;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL,
Code.ABORTED, Code.CANCELLED);

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
private JsonStreamWriter streamWriter;
@GuardedBy("lock")
private RuntimeException error = null;

public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Retrive table schema information.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(parentTable.getDataset(), parentTable.getTable());
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter = JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build();
}

public void append(AppendContext appendContext)
throws DescriptorValidationException, IOException {
synchronized (this.lock) {
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
}
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
}
System.out.println("Appended records successfully.");
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n" + e.toString());
// Append asynchronously for increased throughput.
ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
ApiFutures.addCallback(future, new AppendCompleteCallback(this, appendContext),
MoreExecutors.directExecutor());

// Increase the count of in-flight requests.
inflightRequestCount.register();
}

public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();

// Close the connection to the server.
streamWriter.close();

// Verify that no error occurred in the stream.
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
}

static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

private final DataWriter parent;
private final AppendContext appendContext;

public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
this.parent = parent;
this.appendContext = appendContext;
}

public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
done();
}

public void onFailure(Throwable throwable) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (appendContext.retryCount < MAX_RETRY_COUNT && RETRIABLE_ERROR_CODES.contains(
status.getCode())) {
appendContext.retryCount++;
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
// Mark the existing attempt as done since it's being retried.
done();
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
}
}

synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(throwable);
this.parent.error =
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable);
done();
}

private void done() {
// Reduce the count of in-flight requests.
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
}
}
Expand Down

0 comments on commit 6d4b3d0

Please sign in to comment.