Skip to content

Commit

Permalink
docs(samples): Add WriteAPI BUFFERED mode sample (#1338)
Browse files Browse the repository at this point in the history
* docs(samples): Add WriteAPI BUFFERED mode sample

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Verify the rows were added to the table

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Veronica Wasson <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 4, 2021
1 parent 9370eb8 commit 5dfd523
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-bigquerystora
| Parallel Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java) |
| Storage Arrow Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java) |
| Storage Sample | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java) |
| Write Buffered Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java) |
| Write Committed Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java) |
| Write Pending Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java) |
| Write To Default Stream | [source code](https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-bigquerystorage&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java) |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_buffered]
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteBufferedStream {

public static void runWriteBufferedStream()
throws DescriptorValidationException, InterruptedException, IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";

writeBufferedStream(projectId, datasetName, tableName);
}

public static void writeBufferedStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
try (BigQueryWriteClient client = BigQueryWriteClient.create()) {
// Initialize a write stream for the specified table.
// For more information on WriteStream.Type, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build();
TableName parentTable = TableName.of(projectId, datasetName, tableName);
CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(parentTable.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.build()) {
// Write two batches to the stream, each with 10 JSON records.
for (int i = 0; i < 2; i++) {
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
// Create a JSON object that is compatible with the table schema.
JSONObject record = new JSONObject();
record.put("col1", String.format("buffered-record %03d", i));
jsonArr.put(record);
}
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
}

// Flush the buffer.
FlushRowsRequest flushRowsRequest =
FlushRowsRequest.newBuilder()
.setWriteStream(writeStream.getName())
.setOffset(Int64Value.of(10 * 2 - 1)) // Advance the cursor to the latest record.
.build();
FlushRowsResponse flushRowsResponse = client.flushRows(flushRowsRequest);
// You can continue to write to the stream after flushing the buffer.
}
System.out.println("Appended and committed records successfully.");
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println(e);
}
}
}
// [END bigquerystorage_jsonstreamwriter_buffered]
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

import static com.google.common.truth.Truth.assertThat;
import static junit.framework.TestCase.assertNotNull;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class WriteBufferedStreamIT {

private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");

private ByteArrayOutputStream bout;
private PrintStream out;
private BigQuery bigquery;
private String datasetName;
private String tableName;

private static void requireEnvVar(String varName) {
assertNotNull(
"Environment variable " + varName + " is required to perform these tests.",
System.getenv(varName));
}

@BeforeClass
public static void checkRequirements() {
requireEnvVar("GOOGLE_CLOUD_PROJECT");
}

@Before
public void setUp() {
bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
System.setOut(out);

bigquery = BigQueryOptions.getDefaultInstance().getService();

// Create a new dataset and table for each test.
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
tableName = "PENDING_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
Schema schema = Schema.of(Field.of("col1", StandardSQLTypeName.STRING));
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
TableInfo tableInfo =
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
.build();
bigquery.create(tableInfo);
}

@After
public void tearDown() {
bigquery.delete(
DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), DatasetDeleteOption.deleteContents());
System.setOut(null);
}

@Test
public void testWriteBufferedStream() throws Exception {
WriteBufferedStream.writeBufferedStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName);
assertThat(bout.toString()).contains("Appended and committed records successfully.");

// Verify that the records are visible in the table.
String query = "SELECT * FROM " + tableName;
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query).setDefaultDataset(datasetName).build();
TableResult result = bigquery.query(queryConfig);
assertThat(result.getTotalRows()).isEqualTo(20);
}
}

0 comments on commit 5dfd523

Please sign in to comment.