Skip to content

Commit

Permalink
fix: refactor only, add StreamWriter to AppendRowsRequestResponse (#1981
Browse files Browse the repository at this point in the history
)

* fix: refactor only, add StreamWriter to AppendRequestResponse, so that we could callback on StreamWriter to manage its close

* .

* .

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] authored Feb 10, 2023
1 parent 760ba6e commit da06a46
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 41 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.6.0')
implementation platform('com.google.cloud:libraries-bom:26.7.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.30.0'
```
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.31.0'
If you are using SBT, add this to your dependencies:
```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.30.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.31.0"
```

## Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand Down Expand Up @@ -267,16 +268,19 @@ public void run(Throwable finalStatus) {
}

/** Schedules the writing of rows at given offset. */
ApiFuture<AppendRowsResponse> append(
String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) {
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
Preconditions.checkNotNull(streamWriter);
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(
ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build());
ProtoData.newBuilder()
.setWriterSchema(streamWriter.getProtoSchema())
.setRows(rows)
.build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamName);
return appendInternal(requestBuilder.build());
requestBuilder.setWriteStream(streamWriter.getStreamName());
return appendInternal(streamWriter, requestBuilder.build());
}

Boolean isUserClosed() {
Expand All @@ -288,8 +292,9 @@ Boolean isUserClosed() {
}
}

private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
private ApiFuture<AppendRowsResponse> appendInternal(
StreamWriter streamWriter, AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Expand Down Expand Up @@ -840,10 +845,14 @@ static final class AppendRequestAndResponse {
final AppendRowsRequest message;
final long messageSize;

AppendRequestAndResponse(AppendRowsRequest message) {
// The writer that issues the call of the request.
final StreamWriter streamWriter;

AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
this.streamWriter = streamWriter;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
}
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
connectionWorker.append(streamWriter, rows, offset);
return ApiFutures.transform(
responseFuture,
// Add callback for update schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ public enum Kind {
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows protoRows, long offset) {
if (getKind() == Kind.CONNECTION_WORKER) {
return connectionWorker()
.append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset);
return connectionWorker().append(streamWriter, protoRows, offset);
} else {
return connectionWorkerPool().append(streamWriter, protoRows, offset);
}
Expand Down Expand Up @@ -376,7 +375,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
if (userClosed.get()) {
AppendRequestAndResponse requestWrapper =
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build());
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build(), this);
requestWrapper.appendResult.setException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public void testMultiplexedAppendSuccess() throws Exception {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema("foo"))
.build();
StreamWriter sw2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(createProtoSchema("complicate"))
.build();
// We do a pattern of:
// send to stream1, string1
// send to stream1, string2
Expand All @@ -95,8 +103,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
createProtoSchema("foo"),
sw1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand All @@ -105,8 +112,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_2,
createProtoSchema("complicate"),
sw2,
createComplicateTypeProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand Down Expand Up @@ -197,23 +203,27 @@ public void testAppendInSameStream_switchSchema() throws Exception {
// send to stream1, schema3
// send to stream1, schema1
// ...
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
StreamWriter sw2 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build();
StreamWriter sw3 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build();
for (long i = 0; i < appendCount; i++) {
switch ((int) i % 4) {
case 0:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
sw1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
case 1:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema2,
sw2,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand All @@ -222,8 +232,7 @@ public void testAppendInSameStream_switchSchema() throws Exception {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema3,
sw3,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
Expand Down Expand Up @@ -293,6 +302,9 @@ public void testAppendInSameStream_switchSchema() throws Exception {

@Test
public void testAppendButInflightQueueFull() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
Expand All @@ -305,7 +317,6 @@ public void testAppendButInflightQueueFull() throws Exception {
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
ProtoSchema schema1 = createProtoSchema("foo");

long appendCount = 6;
for (int i = 0; i < appendCount; i++) {
Expand All @@ -322,23 +333,15 @@ public void testAppendButInflightQueueFull() throws Exception {
StatusRuntimeException.class,
() -> {
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(5)}),
5);
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(5)}), 5);
});
long timeDiff = System.currentTimeMillis() - startTime;
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5);
assertTrue(timeDiff > 500);
} else {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
}
}
Expand Down Expand Up @@ -396,11 +399,10 @@ private ProtoSchema createProtoSchema(String protoName) {

private ApiFuture<AppendRowsResponse> sendTestMessage(
ConnectionWorker connectionWorker,
String streamName,
ProtoSchema protoSchema,
StreamWriter streamWriter,
ProtoRows protoRows,
long offset) {
return connectionWorker.append(streamName, protoSchema, protoRows, offset);
return connectionWorker.append(streamWriter, protoRows, offset);
}

private ProtoRows createFooProtoRows(String[] messages) {
Expand Down

0 comments on commit da06a46

Please sign in to comment.