Skip to content

Commit

Permalink
docs(samples): update WriteToDefaultStream.java sample
Browse files Browse the repository at this point in the history
Towards #1249
BQToBQStorageSchemaConverter.java will go into google-cloud-bigquery eventually. For now it acts as a resource for developers who want to use default stream to write.
  • Loading branch information
stephaniewang526 committed Sep 9, 2021
1 parent a5facf9 commit 3e2b009
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.example.bigquerystorage;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.common.collect.ImmutableMap;

/** Converts structure from BigQuery client to BigQueryStorage client */
public class BQToBQStorageSchemaConverter {
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
ImmutableMap.of(
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
.build();

/**
* Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
*
* @param schema the BigQuery client Table Schema
* @return the bigquery storage API Table Schema
*/
public static TableSchema ConvertTableSchema(Schema schema) {
TableSchema.Builder result = TableSchema.newBuilder();
for (int i = 0; i < schema.getFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(schema.getFields().get(i)));
}
return result.build();
}

/**
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
*
* @param field the BigQuery client Field Schema
* @return the bigquery storage API Field Schema
*/
public static TableFieldSchema ConvertFieldSchema(Field field) {
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
if (field.getMode() == null) {
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
}
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
result.setName(field.getName());
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
if (field.getDescription() != null) {
result.setDescription(field.getDescription());
}
if (field.getSubFields() != null) {
for (int i = 0; i < field.getSubFields().size(); i++) {
result.addFields(i, ConvertFieldSchema(field.getSubFields().get(i)));
}
}
return result.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

// [START bigquerystorage_jsonstreamwriter_default]
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.protobuf.Descriptors.DescriptorValidationException;
Expand All @@ -37,20 +40,16 @@ public static void runWriteToDefaultStream()
String projectId = "MY_PROJECT_ID";
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
TableFieldSchema strField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_string")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, strField).build();
writeToDefaultStream(projectId, datasetName, tableName, tableSchema);
writeToDefaultStream(projectId, datasetName, tableName);
}

public static void writeToDefaultStream(
String projectId, String datasetName, String tableName, TableSchema tableSchema)
public static void writeToDefaultStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(datasetName, tableName);
TableName parentTable = TableName.of(projectId, datasetName, tableName);
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BQToBQStorageSchemaConverter.ConvertTableSchema(schema);

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
Expand All @@ -52,7 +50,6 @@ public class WriteToDefaultStreamIT {
private BigQuery bigquery;
private String datasetName;
private String tableName;
private TableSchema tableSchema;

private static void requireEnvVar(String varName) {
assertNotNull(
Expand All @@ -76,23 +73,10 @@ public void setUp() {
// Create a new dataset and table for each test.
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
Schema schema = Schema.of(Field.of("test_string", StandardSQLTypeName.STRING));
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
TableFieldSchema strField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_string")
.build();
tableSchema = TableSchema.newBuilder().addFields(0, strField).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(datasetName, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test_string", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build())))
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
.build();
bigquery.create(tableInfo);
}
Expand All @@ -106,8 +90,7 @@ public void tearDown() {

@Test
public void testWriteToDefaultStream() throws Exception {
WriteToDefaultStream.writeToDefaultStream(
GOOGLE_CLOUD_PROJECT, datasetName, tableName, tableSchema);
WriteToDefaultStream.writeToDefaultStream(GOOGLE_CLOUD_PROJECT, datasetName, tableName);
assertThat(bout.toString()).contains("Appended records successfully.");
}
}

0 comments on commit 3e2b009

Please sign in to comment.