Skip to content

Commit

Permalink
feat: populate location info if we already called GetWriteStream (#1802)
Browse files Browse the repository at this point in the history
* feat: add get location support

* .

* add some check on set location if there is already something set
  • Loading branch information
yirutang authored Sep 27, 2022
1 parent d8aaed5 commit 5f43103
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ public Descriptor getDescriptor() {
return this.descriptor;
}

/**
* Gets the location of the destination
*
* @return Descriptor
*/
public String getLocation() {
return this.streamWriter.getLocation();
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
Expand Down Expand Up @@ -407,6 +416,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
TableSchema writeStreamTableSchema = writeStream.getTableSchema();

this.tableSchema = writeStreamTableSchema;
this.location = writeStream.getLocation();
} else {
this.tableSchema = tableSchema;
}
Expand Down Expand Up @@ -526,6 +536,10 @@ public Builder setEnableConnectionPool(boolean enableConnectionPool) {
* @return Builder
*/
public Builder setLocation(String location) {
if (this.location != null && !this.location.equals(location)) {
throw new IllegalArgumentException(
"Specified location " + location + " does not match the system value " + this.location);
}
this.location = location;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class StreamWriter implements AutoCloseable {
/** Every writer has a fixed proto schema. */
private final ProtoSchema writerSchema;

/*
* Location of the destination.
*/
private final String location;

/*
* A String that uniquely identifies this writer.
*/
Expand Down Expand Up @@ -162,6 +167,7 @@ private StreamWriter(Builder builder) throws IOException {
BigQueryWriteClient client;
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.location = builder.location;
boolean ownsBigQueryWriteClient;
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
Expand Down Expand Up @@ -193,7 +199,7 @@ private StreamWriter(Builder builder) throws IOException {
client,
ownsBigQueryWriteClient));
} else {
if (builder.location == "") {
if (builder.location == null || builder.location.isEmpty()) {
throw new IllegalArgumentException("Location must be specified for multiplexing client!");
}
// Assume the connection in the same pool share the same client and trace id.
Expand Down Expand Up @@ -318,6 +324,11 @@ public ProtoSchema getProtoSchema() {
return writerSchema;
}

/** @return the location of the destination. */
public String getLocation() {
return location;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down Expand Up @@ -379,7 +390,7 @@ public static final class Builder {

private TableSchema updatedTableSchema = null;

private String location;
private String location = null;

private boolean enableConnectionPool = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,48 @@ public void testAppendOutOfRangeException() throws Exception {
public void testCreateDefaultStream() throws Exception {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setLocation("aa")
.setTableSchema(tableSchema)
.build());
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_TABLE, tableSchema)
JsonStreamWriter.newBuilder(TEST_TABLE, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
assertEquals("aa", writer.getLocation());
}
}

@Test
public void testCreateDefaultStreamWrongLocation() throws Exception {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setLocation("aa")
.setTableSchema(tableSchema)
.build());
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
JsonStreamWriter.newBuilder(TEST_TABLE, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setLocation("bb")
.build();
}
});
assertEquals("Specified location bb does not match the system value aa", ex.getMessage());
}

@Test
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
Expand Down

0 comments on commit 5f43103

Please sign in to comment.