diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 7078fd6da1..a0f3f807d7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -246,6 +246,15 @@ public Descriptor getDescriptor() { return this.descriptor; } + /** + * Gets the location of the destination + * + * @return Descriptor + */ + public String getLocation() { + return this.streamWriter.getLocation(); + } + /** * Returns the wait of a request in Client side before sending to the Server. Request could wait * in Client because it reached the client side inflight request limit (adjustable when @@ -407,6 +416,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite TableSchema writeStreamTableSchema = writeStream.getTableSchema(); this.tableSchema = writeStreamTableSchema; + this.location = writeStream.getLocation(); } else { this.tableSchema = tableSchema; } @@ -526,6 +536,10 @@ public Builder setEnableConnectionPool(boolean enableConnectionPool) { * @return Builder */ public Builder setLocation(String location) { + if (this.location != null && !this.location.equals(location)) { + throw new IllegalArgumentException( + "Specified location " + location + " does not match the system value " + this.location); + } this.location = location; return this; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index be6c10dff8..46b9e141bf 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -51,6 +51,11 @@ public class StreamWriter implements AutoCloseable { /** Every writer has a fixed proto schema. */ private final ProtoSchema writerSchema; + /* + * Location of the destination. + */ + private final String location; + /* * A String that uniquely identifies this writer. */ @@ -162,6 +167,7 @@ private StreamWriter(Builder builder) throws IOException { BigQueryWriteClient client; this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; + this.location = builder.location; boolean ownsBigQueryWriteClient; if (builder.client == null) { BigQueryWriteSettings stubSettings = @@ -193,7 +199,7 @@ private StreamWriter(Builder builder) throws IOException { client, ownsBigQueryWriteClient)); } else { - if (builder.location == "") { + if (builder.location == null || builder.location.isEmpty()) { throw new IllegalArgumentException("Location must be specified for multiplexing client!"); } // Assume the connection in the same pool share the same client and trace id. @@ -318,6 +324,11 @@ public ProtoSchema getProtoSchema() { return writerSchema; } + /** @return the location of the destination. */ + public String getLocation() { + return location; + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() { @@ -379,7 +390,7 @@ public static final class Builder { private TableSchema updatedTableSchema = null; - private String location; + private String location = null; private boolean enableConnectionPool = false; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 2398415269..c4f788482b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -393,15 +393,48 @@ public void testAppendOutOfRangeException() throws Exception { public void testCreateDefaultStream() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build(); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setLocation("aa") + .setTableSchema(tableSchema) + .build()); try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(TEST_TABLE, tableSchema) + JsonStreamWriter.newBuilder(TEST_TABLE, client) .setChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) .build()) { assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName()); + assertEquals("aa", writer.getLocation()); } } + @Test + public void testCreateDefaultStreamWrongLocation() throws Exception { + TableSchema tableSchema = + TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build(); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setLocation("aa") + .setTableSchema(tableSchema) + .build()); + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + JsonStreamWriter.newBuilder(TEST_TABLE, client) + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setLocation("bb") + .build(); + } + }); + assertEquals("Specified location bb does not match the system value aa", ex.getMessage()); + } + @Test public void testSimpleSchemaUpdate() throws Exception { try (JsonStreamWriter writer =