Skip to content

Commit

Permalink
Code snippets for WriteAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
VeronicaWasson committed Dec 29, 2020
1 parent 9b304b9 commit 9419d34
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 1 deletion.
2 changes: 1 addition & 1 deletion samples/install-without-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>1.7.2</version>
<version>1.7.3-SNAPSHOT</version>

This comment has been minimized.

Copy link
@stephaniewang526

stephaniewang526 Dec 30, 2020

this should remain the latest release version.

This comment has been minimized.

Copy link
@VeronicaWasson

VeronicaWasson Dec 30, 2020

Author Owner

Thanks - I actually didn't mean to commit my changes to the pom files!

This comment has been minimized.

Copy link
@yirutang

yirutang via email Dec 30, 2020

</dependency>
<!-- [END bigquerystorage_install_without_bom] -->

Expand Down
1 change: 1 addition & 0 deletions samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>1.7.3-SNAPSHOT</version>

This comment has been minimized.

Copy link
@stephaniewang526

stephaniewang526 Dec 30, 2020

the version of the artifact here is managed through the libraries-bom.

</dependency>
<!-- [END bigquerystorage_install_with_bom] -->

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.example.bigquerystorage;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.storage.v1beta2.*;

import org.json.JSONArray;
import org.json.JSONObject;

import java.util.concurrent.TimeUnit;

public class WriteCommittedStream {

public static void runWriteCommittedStream() {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
writeCommittedStream(projectId, datasetName, tableName);
}

public static void writeCommittedStream(
String projectId, String datasetName, String tableName) {

try (BigQueryWriteClient client = BigQueryWriteClient.create()) {

// Initialize a write stream for the specified table.
WriteStream stream = WriteStream.newBuilder()
.setType(WriteStream.Type.COMMITTED)
.build();

TableName parent = TableName.of(projectId,datasetName,tableName);

CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

// Use the JSON stream writer to send records in JSON format.
try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(
writeStream.getName(),
writeStream.getTableSchema(),
client).build()) {

for (int i = 0; i < 10; i++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("record %03d",i));
JSONArray jsonArr = new JSONArray();
jsonArr.put(record);

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr,false);

This comment has been minimized.

Copy link
@yirutang

yirutang Dec 29, 2020

Append using offset, otherwise there is not much sense to use committed mode. And you can show if append using the same offset, a ALREADY_EXISTED will be returned.

AppendRowsResponse response = future.get(2000, TimeUnit.MILLISECONDS);

This comment has been minimized.

Copy link
@yirutang

yirutang Dec 29, 2020

There shouldn't need to be a time out.

}

}

System.out.println("Appended records successfully.");

}
catch (Exception e) {
System.out.println("Failed to append records. \n" + e.toString());
}
}

public static void writeToDefaultStream(
String projectId, String datasetName, String tableName) {

TableName parent = TableName.of(projectId, datasetName, tableName);

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(datasetName, tableName);
Schema schema = table.getDefinition().getSchema();

try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(parent.toString(),schema)
.createDefaultStream()
.build())
{

for (int i = 0; i < 10; i++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("record %03d",i));
JSONArray jsonArr = new JSONArray();
jsonArr.put(record);

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr,false);
AppendRowsResponse response = future.get(2000, TimeUnit.MILLISECONDS);

This comment has been minimized.

Copy link
@yirutang

yirutang Dec 29, 2020

Maybe can add a simple retry here. If the caught exception is internal, aborted or cancelled, retry the message.

}
}
catch (Exception e) {
System.out.println(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.example.bigquerystorage;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1beta2.*;

import org.json.JSONArray;
import org.json.JSONObject;

import java.util.concurrent.TimeUnit;

public class WritePendingStream {

public static void runWritePendingStream() {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";

writePendingStream(projectId, datasetName, tableName);
}

public static void writePendingStream(
String projectId, String datasetName, String tableName) {

try (BigQueryWriteClient client = BigQueryWriteClient.create()) {

WriteStream stream = WriteStream.newBuilder()
.setType(WriteStream.Type.PENDING)
.build();

TableName parent = TableName.of(projectId,datasetName,tableName);

CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(
writeStream.getName(),
writeStream.getTableSchema(),
client).build()) {

for (int i = 0; i < 10; i++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("batch-record %03d",i));
JSONArray jsonArr = new JSONArray();
jsonArr.put(record);

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr,false);
AppendRowsResponse response = future.get(2000, TimeUnit.MILLISECONDS);

}
FinalizeWriteStreamResponse finalizeResponse =
client.finalizeWriteStream(writeStream.getName());
System.out.println("Rows written: " + finalizeResponse.getRowCount());
}

// Commit the streams
BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder()
.setParent(parent.toString())
.addWriteStreams(writeStream.getName())
.build();
BatchCommitWriteStreamsResponse commitResponse = client.batchCommitWriteStreams(commitRequest);
}
catch (Exception e) {
System.out.println(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@

package com.example.bigquerystorage;

import static com.google.common.truth.Truth.assertThat;
import static junit.framework.TestCase.assertNotNull;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class WriteAPISampleIT {

private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT");
private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME");
private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME");

private final Logger log = Logger.getLogger(this.getClass().getName());
private ByteArrayOutputStream bout;
private PrintStream out;
private PrintStream originalPrintStream;

private static void requireEnvVar(String varName) {
assertNotNull(
"Environment variable " + varName + " is required to perform these tests.",
System.getenv(varName));
}

@BeforeClass
public static void checkRequirements() {
requireEnvVar("GOOGLE_CLOUD_PROJECT");
requireEnvVar("BIGQUERY_DATASET_NAME");
requireEnvVar("BIGQUERY_TABLE_NAME");
}

@Before
public void setUp() {
bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
originalPrintStream = System.out;
System.setOut(out);
}

@After
public void tearDown() {
System.out.flush();
System.setOut(originalPrintStream);
log.log(Level.INFO, "\n" + bout.toString());
}


@Test
public void testWriteCommittedStream() throws Exception {
WriteCommittedStream.writeCommittedStream(
GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME);
}

@Test
public void testWriteToDefaultStream() throws Exception {
WriteCommittedStream.writeToDefaultStream(
GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME);
}

@Test
public void testWritePendingStream() throws Exception {
WritePendingStream.writePendingStream(
GOOGLE_CLOUD_PROJECT, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

package com.example.bigquerystorage;

import static com.google.common.truth.Truth.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class WritePendingStreamIT {

private static final String BIGQUERY_PROJECT_NAME = System.getenv("BIGQUERY_PROJECT_NAME");
private static final String BIGQUERY_DATASET_NAME = System.getenv("BIGQUERY_DATASET_NAME");
private static final String BIGQUERY_TABLE_NAME = System.getenv("BIGQUERY_TABLE_NAME");

@Test
public void testWritePendingStream() throws Exception {
WritePendingStream.writePendingStream(BIGQUERY_PROJECT_NAME, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME);
}
}

0 comments on commit 9419d34

Please sign in to comment.