Skip to content

Commit

Permalink
feat: StreamWriterV2 will attach stream name in the first request in …
Browse files Browse the repository at this point in the history
…the connection and remove stream name and schema in the following ones.
  • Loading branch information
yayi-google committed Feb 24, 2021
1 parent 237c827 commit 33d860a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ public void close() {
* It takes requests from waiting queue and sends them to server.
*/
private void appendLoop() {
boolean isFirstRequestInConnection = true;
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
while (!waitingQueueDrained()) {
this.lock.lock();
Expand All @@ -322,7 +323,11 @@ private void appendLoop() {

// TODO: Add reconnection here.
while (!localQueue.isEmpty()) {
this.streamConnection.send(localQueue.pollFirst().message);
AppendRowsRequest preparedRequest =
prepareRequestBasedOnPosition(
localQueue.pollFirst().message, isFirstRequestInConnection);
this.streamConnection.send(preparedRequest);
isFirstRequestInConnection = false;
}
}

Expand Down Expand Up @@ -371,6 +376,18 @@ private void waitForDoneCallback() {
}
}

private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest original, boolean isFirstRequest) {
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
requestBuilder.setWriteStream(this.streamName);
} else {
requestBuilder.clearWriteStream();
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
return requestBuilder.build();
}

private void cleanupInflightRequests() {
Throwable finalStatus;
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigquery.storage.v1beta2;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -192,6 +193,19 @@ public void testAppendSuccess() throws Exception {
}
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());

for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
if (i == 0) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
}
}

writer.close();
}

Expand Down

0 comments on commit 33d860a

Please sign in to comment.