From b365503441ef81f8fe0a97b22bd0e3796ef6d2c4 Mon Sep 17 00:00:00 2001 From: yayi Date: Sat, 20 Feb 2021 17:35:16 -0800 Subject: [PATCH 1/6] Add a new StreamWriterV2. Compared to existing StreamWriter, its locking mechanism is much simpler. --- google-cloud-bigquerystorage/pom.xml | 8 + .../storage/v1beta2/StreamConnection.java | 104 ++++++ .../storage/v1beta2/StreamWriterV2.java | 352 ++++++++++++++++++ .../storage/v1beta2/FakeBigQueryWrite.java | 4 + .../v1beta2/FakeBigQueryWriteImpl.java | 14 + .../storage/v1beta2/StreamWriterV2Test.java | 283 ++++++++++++++ 6 files changed, 765 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 28d5248fa2..b3bc46815c 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -44,6 +44,14 @@ org.codehaus.mojo flatten-maven-plugin + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java new file mode 100644 index 0000000000..c04f43c944 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1beta2; + +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; + +/** + * StreamConnection is responsible for writing requests to a GRPC bidirecional connection. + * + *

StreamWriter creates a connection. Two callback functions are necessary: request_callback and + * done_callback. Request callback is used for every request, and done callback is used to notify + * the user that the connection is closed and no more callbacks will be received from this + * connection. + * + *

The stream writer will accept all the requests without flow control, and makes the callbacks + * in receiving order. + * + *

It's user's responsibility to do the flow control and maintain the lifetime of the requests. + */ +public class StreamConnection { + private BidiStreamingCallable bidiStreamingCallable; + private ClientStream clientStream; + + private RequestCallback requestCallback; + private DoneCallback doneCallback; + + public StreamConnection( + BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) { + this.requestCallback = requestCallback; + this.doneCallback = doneCallback; + + bidiStreamingCallable = client.appendRowsCallable(); + clientStream = + bidiStreamingCallable.splitCall( + new ResponseObserver() { + + @Override + public void onStart(StreamController controller) { + // no-op + } + + @Override + public void onResponse(AppendRowsResponse response) { + StreamConnection.this.requestCallback.run(response); + } + + @Override + public void onError(Throwable t) { + StreamConnection.this.doneCallback.run(t); + } + + @Override + public void onComplete() { + StreamConnection.this.doneCallback.run( + new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withDescription("Stream is closed by user."))); + } + }); + } + + /** + * Sends a request to the bi-directional stream connection. + * + * @param request request to send. + */ + public void send(AppendRowsRequest request) { + clientStream.send(request); + } + + /** Close the bi-directional stream connection. */ + public void close() { + clientStream.closeSend(); + } + + /** Invoked when a response is received from the server. */ + public static interface RequestCallback { + public void run(AppendRowsResponse response); + } + + /** Invoked when server closes the connection. */ + public static interface DoneCallback { + public void run(Throwable finalStatus); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java new file mode 100644 index 0000000000..0a7c158c64 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -0,0 +1,352 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1beta2; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.time.Duration; +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; +import javax.annotation.concurrent.GuardedBy; + +/** + * A BigQuery Stream Writer that can be used to write data into BigQuery Table. + * + *

TODO: Add credential support. + * + *

TODO: Attach schema. + * + *

TODO: Add max size check. + * + *

TODO: Add inflight control. + * + *

TODO: Attach traceId. + * + *

TODO: Support batching. + * + *

TODO: Support schema change. + */ +public class StreamWriterV2 implements AutoCloseable { + private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName()); + + private static final Duration DONE_CALLBACK_WAIT_TIMEOUT = Duration.ofMinutes(10); + + private Lock lock; + private Condition hasMessageInWaitingQueue; + + /* + * The identifier of stream to write to. + */ + private final String streamName; + + /* + * Indicates whether user has called Close() or not. + */ + @GuardedBy("lock") + private boolean userClosed = false; + + /* + * The final status of connection. Set to nonnull when connection is permanently closed. + */ + @GuardedBy("lock") + private Throwable connectionFinalStatus = null; + + /* + * Contains requests buffered in the client and not yet sent to server. + */ + @GuardedBy("lock") + private final Deque waitingRequestQueue; + + /* + * Contains sent append requests waiting for response from server. + */ + @GuardedBy("lock") + private final Deque inflightRequestQueue; + + /* + * Wraps the underlying bi-directional stream connection with server. + */ + private StreamConnection streamConnection; + + /* + * A separate thread to handle actual communication with server. + */ + private Thread appendThread; + + private StreamWriterV2(Builder builder) { + this.lock = new ReentrantLock(); + this.hasMessageInWaitingQueue = lock.newCondition(); + this.streamName = builder.streamName; + this.waitingRequestQueue = new LinkedList(); + this.inflightRequestQueue = new LinkedList(); + this.streamConnection = + new StreamConnection(builder.client, this::requestCallback, this::doneCallback); + this.appendThread = new Thread(this::appendLoop); + this.appendThread.start(); + } + + /** + * Schedules the writing of a message. + * + *

Example of writing a message. + * + *

{@code
+   * AppendRowsRequest message;
+   * ApiFuture messageIdFuture = writer.append(message);
+   * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+   *   public void onSuccess(AppendRowsResponse response) {
+   *     if (response.hasOffset()) {
+   *       System.out.println("written with offset: " + response.getOffset());
+   *     } else {
+   *       System.out.println("received an in stream error: " + response.error().toString());
+   *     }
+   *   }
+   *
+   *   public void onFailure(Throwable t) {
+   *     System.out.println("failed to write: " + t);
+   *   }
+   * }, MoreExecutors.directExecutor());
+   * }
+ * + * @param message the message in serialized format to write to BigQuery. + * @return the message ID wrapped in a future. + */ + public ApiFuture append(AppendRowsRequest message) { + AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); + this.lock.lock(); + try { + if (userClosed) { + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription("Stream is already closed"))); + return requestWrapper.appendResult; + } + if (connectionFinalStatus != null) { + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription( + "Stream is closed due to " + connectionFinalStatus.toString()))); + return requestWrapper.appendResult; + } + waitingRequestQueue.addLast(requestWrapper); + hasMessageInWaitingQueue.signal(); + return requestWrapper.appendResult; + } finally { + this.lock.unlock(); + } + } + + /** Close the stream writer. Shut down all resources. */ + @Override + public void close() { + log.info("User closing stream: " + streamName); + this.lock.lock(); + try { + this.userClosed = true; + } finally { + this.lock.unlock(); + } + log.info("Waiting for append thread to finish. Stream: " + streamName); + try { + appendThread.join(); + log.info("User close complete. Stream: " + streamName); + } catch (InterruptedException e) { + // Unexpected. Just swallow the exception with logging. + log.warning( + "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); + } + } + + /* + * This loop is executed in a separate thread. + * + * It takes requests from waiting queue and sends them to server. + */ + private void appendLoop() { + Deque localQueue = new LinkedList(); + while (!waitingQueueDrained()) { + this.lock.lock(); + try { + hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); + while (!this.waitingRequestQueue.isEmpty()) { + localQueue.addLast(this.waitingRequestQueue.pollFirst()); + } + } catch (InterruptedException e) { + log.warning("Interrupted while waiting for message. Error: " + e.toString()); + } finally { + this.lock.unlock(); + } + + if (localQueue.isEmpty()) { + continue; + } + + // TODO: Add reconnection here. + + this.lock.lock(); + try { + while (!localQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + this.streamConnection.send(requestWrapper.message); + } + } finally { + this.lock.unlock(); + } + } + + log.info("Cleanup starts. Stream: " + streamName); + // At this point, the waiting queue is drained, so no more requests. + // We can close the stream connection and handle the remaining inflight requests. + this.streamConnection.close(); + + log.info("Waiting for done callback from stream connection. Stream: " + streamName); + long waitDeadlineMs = System.currentTimeMillis() + DONE_CALLBACK_WAIT_TIMEOUT.toMillis(); + while (true) { + if (System.currentTimeMillis() > waitDeadlineMs) { + log.warning( + "Timeout waiting for done wallback. Skip inflight cleanup. Stream: " + streamName); + return; + } + this.lock.lock(); + try { + if (connectionFinalStatus != null) { + // Done callback is received, break. + break; + } + } finally { + this.lock.unlock(); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + // At this point, there cannot be more callback. It is safe to clean up all inflight requests. + log.info( + "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName); + cleanupInflightRequests(); + log.info("Append thread is done. Stream: " + streamName); + } + + /* + * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue. + * + * It serves as a signal to append thread that there cannot be any more requests in the waiting + * queue and it can prepare to stop. + */ + private boolean waitingQueueDrained() { + this.lock.lock(); + try { + return (this.userClosed || this.connectionFinalStatus != null) + && this.waitingRequestQueue.isEmpty(); + } finally { + this.lock.unlock(); + } + } + + private void cleanupInflightRequests() { + Throwable finalStatus; + Deque localQueue = new LinkedList(); + this.lock.lock(); + try { + finalStatus = this.connectionFinalStatus; + while (!this.inflightRequestQueue.isEmpty()) { + localQueue.addLast(this.inflightRequestQueue.pollFirst()); + } + } finally { + this.lock.unlock(); + } + log.info( + "Cleaning " + + localQueue.size() + + " inflight requests with error: " + + finalStatus.toString()); + while (!localQueue.isEmpty()) { + localQueue.pollFirst().appendResult.setException(finalStatus); + } + } + + private void requestCallback(AppendRowsResponse response) { + AppendRequestAndResponse requestWrapper; + this.lock.lock(); + try { + requestWrapper = this.inflightRequestQueue.pollFirst(); + } finally { + this.lock.unlock(); + } + requestWrapper.appendResult.set(response); + } + + private void doneCallback(Throwable finalStatus) { + log.info( + "Received done callback. Stream: " + + streamName + + " Final status: " + + finalStatus.toString()); + this.lock.lock(); + try { + this.connectionFinalStatus = finalStatus; + } finally { + this.lock.unlock(); + } + } + + /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */ + public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWriteClient client) { + return new StreamWriterV2.Builder(streamName, client); + } + + /** A builder of {@link StreamWriterV2}s. */ + public static final class Builder { + + private String streamName; + + private BigQueryWriteClient client; + + private Builder(String streamName, BigQueryWriteClient client) { + this.streamName = Preconditions.checkNotNull(streamName); + ; + this.client = Preconditions.checkNotNull(client); + ; + } + + /** Builds the {@code StreamWriterV2}. */ + public StreamWriterV2 build() { + return new StreamWriterV2(this); + } + } + + // Class that wraps AppendRowsRequest and its corresponding Response future. + private static final class AppendRequestAndResponse { + final SettableApiFuture appendResult; + final AppendRowsRequest message; + + AppendRequestAndResponse(AppendRowsRequest message) { + this.appendResult = SettableApiFuture.create(); + this.message = message; + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java index 618366cfdc..a333260529 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java @@ -79,6 +79,10 @@ public void setResponseDelay(Duration delay) { serviceImpl.setResponseDelay(delay); } + public void setResponseSleep(Duration sleep) { + serviceImpl.setResponseSleep(sleep); + } + public void setExecutor(ScheduledExecutorService executor) { serviceImpl.setExecutor(executor); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java index 7cef4f7483..b99dab99bd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; @@ -44,6 +45,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private boolean autoPublishResponse; private ScheduledExecutorService executor = null; private Duration responseDelay = Duration.ZERO; + private Duration responseSleep = Duration.ZERO; /** Class used to save the state of a possible response. */ private static class Response { @@ -121,10 +123,16 @@ public void onNext(AppendRowsRequest value) { LOG.info("Get request:" + value.toString()); final Response response = responses.remove(); requests.add(value); + if (responseSleep.compareTo(Duration.ZERO) > 0) { + LOG.info("Sleeping before response for " + responseSleep.toString()); + Uninterruptibles.sleepUninterruptibly( + responseSleep.toMillis(), TimeUnit.MILLISECONDS); + } if (responseDelay == Duration.ZERO) { sendResponse(response, responseObserver); } else { final Response responseToSend = response; + // TODO(yirutang): This is very wrong because it messes up response/complete ordering. LOG.info("Schedule a response to be sent at delay"); executor.schedule( new Runnable() { @@ -173,6 +181,12 @@ public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) { return this; } + /** Set an amount of time by which to sleep before publishing responses. */ + public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) { + this.responseSleep = responseSleep; + return this; + } + public FakeBigQueryWriteImpl addResponse(AppendRowsResponse appendRowsResponse) { responses.add(new Response(appendRowsResponse)); return this; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java new file mode 100644 index 0000000000..39f680b374 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -0,0 +1,283 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1beta2; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Int64Value; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class StreamWriterV2Test { + private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName()); + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private FakeScheduledExecutorService fakeExecutor; + private FakeBigQueryWrite testBigQueryWrite; + private static MockServiceHelper serviceHelper; + private BigQueryWriteClient client; + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + client = + BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + } + + @After + public void tearDown() throws Exception { + log.info("tearDown called"); + client.close(); + serviceHelper.stop(); + } + + private StreamWriterV2 getTestStreamWriterV2() { + return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); + } + + private AppendRowsRequest createAppendRequest(String[] messages, long offset) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema( + ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("Message") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("foo") + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setNumber(1) + .build()) + .build())); + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + if (offset > 0) { + requestBuilder.setOffset(Int64Value.of(offset)); + } + return requestBuilder + .setProtoRows(dataBuilder.setRows(rows.build()).build()) + .setWriteStream(TEST_STREAM) + .build(); + } + + private AppendRowsResponse createAppendResponse(long offset) { + return AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build()) + .build(); + } + + private ApiFuture sendTestMessage(StreamWriterV2 writer, String[] messages) { + return writer.append(createAppendRequest(messages, -1)); + } + + private static T assertFutureException( + Class expectedThrowable, Future future) { + return assertThrows( + expectedThrowable, + () -> { + try { + future.get(); + } catch (ExecutionException ex) { + // Future wraps exception with ExecutionException. So unwrapper it here. + throw ex.getCause(); + } + }); + } + + @Test + public void testAppendSuccess() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + + long appendCount = 1000; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + } + + for (int i = 0; i < appendCount; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndError() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + ApiException actualError = assertFutureException(ApiException.class, appendFuture2); + assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode()); + + writer.close(); + } + + @Test + public void longIdleBetweenAppends() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + + // Sleep to create a long idle between appends. + TimeUnit.SECONDS.sleep(3); + + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendAfterUserClose() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + writer.close(); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertTrue(appendFuture2.isDone()); + StatusRuntimeException actualError = + assertFutureException(StatusRuntimeException.class, appendFuture2); + assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode()); + } + + @Test + public void testAppendAfterServerClose() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiException error1 = assertFutureException(ApiException.class, appendFuture1); + assertEquals(Code.INTERNAL, error1.getStatusCode().getCode()); + + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + assertTrue(appendFuture2.isDone()); + StatusRuntimeException error2 = + assertFutureException(StatusRuntimeException.class, appendFuture2); + assertEquals(Status.Code.FAILED_PRECONDITION, error2.getStatus().getCode()); + + writer.close(); + } + + @Test + public void userCloseWhileRequestInflight() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + // Server will sleep 2 seconds before sending back the response. + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + // Send a request and close the stream in separate thread while the request is inflight. + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + Thread closeThread = + new Thread( + () -> { + writer.close(); + }); + closeThread.start(); + + // Due to the sleep on server, the append won't finish within 1 second even though stream + // is being closed. + assertThrows( + TimeoutException.class, + () -> { + appendFuture1.get(1, TimeUnit.SECONDS); + }); + + // Within 2 seconds, the request should be done and stream should be closed. + closeThread.join(2000); + assertTrue(appendFuture1.isDone()); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + } + + @Test + public void serverCloseWhileRequestsInflight() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + // Server will sleep 2 seconds before closing the connection. + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + + // Send 10 requests, so that there are 10 inflight requests. + int appendCount = 10; + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + } + + // Server close should properly handle all inflight requests. + for (int i = 0; i < appendCount; i++) { + ApiException actualError = assertFutureException(ApiException.class, futures.get(i)); + assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode()); + } + + writer.close(); + ; + } +} From 33124023922020abeeca875333906037bec2f7f4 Mon Sep 17 00:00:00 2001 From: yayi Date: Mon, 22 Feb 2021 12:04:51 -0800 Subject: [PATCH 2/6] Stop using Java8 features as we still need to support Java7 --- google-cloud-bigquerystorage/pom.xml | 8 ----- .../storage/v1beta2/StreamWriterV2.java | 23 +++++++++--- .../storage/v1beta2/StreamWriterV2Test.java | 36 ++++++++++++------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index b3bc46815c..28d5248fa2 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -44,14 +44,6 @@ org.codehaus.mojo flatten-maven-plugin - - org.apache.maven.plugins - maven-compiler-plugin - - 8 - 8 - - diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 0a7c158c64..5b58dccae8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -17,6 +17,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; +import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; @@ -102,8 +104,23 @@ private StreamWriterV2(Builder builder) { this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.streamConnection = - new StreamConnection(builder.client, this::requestCallback, this::doneCallback); - this.appendThread = new Thread(this::appendLoop); + new StreamConnection(builder.client, new RequestCallback() { + @Override + public void run(AppendRowsResponse response) { + requestCallback(response); + } + }, new DoneCallback() { + @Override + public void run(Throwable finalStatus) { + doneCallback(finalStatus); + } + }); + this.appendThread = new Thread(new Runnable() { + @Override + public void run() { + appendLoop(); + } + }); this.appendThread.start(); } @@ -328,9 +345,7 @@ public static final class Builder { private Builder(String streamName, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(streamName); - ; this.client = Preconditions.checkNotNull(client); - ; } /** Builds the {@code StreamWriterV2}. */ diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 39f680b374..c50e5abb70 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -42,6 +42,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; @@ -124,15 +125,18 @@ private ApiFuture sendTestMessage(StreamWriterV2 writer, Str } private static T assertFutureException( - Class expectedThrowable, Future future) { + Class expectedThrowable, final Future future) { return assertThrows( expectedThrowable, - () -> { - try { - future.get(); - } catch (ExecutionException ex) { - // Future wraps exception with ExecutionException. So unwrapper it here. - throw ex.getCause(); + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + try { + future.get(); + } catch (ExecutionException ex) { + // Future wraps exception with ExecutionException. So unwrapper it here. + throw ex.getCause(); + } } }); } @@ -229,17 +233,20 @@ public void testAppendAfterServerClose() throws Exception { @Test public void userCloseWhileRequestInflight() throws Exception { - StreamWriterV2 writer = getTestStreamWriterV2(); + final StreamWriterV2 writer = getTestStreamWriterV2(); // Server will sleep 2 seconds before sending back the response. testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); testBigQueryWrite.addResponse(createAppendResponse(0)); // Send a request and close the stream in separate thread while the request is inflight. - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + final ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); Thread closeThread = new Thread( - () -> { - writer.close(); + new Runnable() { + @Override + public void run() { + writer.close(); + } }); closeThread.start(); @@ -247,8 +254,11 @@ public void userCloseWhileRequestInflight() throws Exception { // is being closed. assertThrows( TimeoutException.class, - () -> { - appendFuture1.get(1, TimeUnit.SECONDS); + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + appendFuture1.get(1, TimeUnit.SECONDS); + } }); // Within 2 seconds, the request should be done and stream should be closed. From b98d6be0a648aab4aed280142508389d89a03dce Mon Sep 17 00:00:00 2001 From: yayi Date: Mon, 22 Feb 2021 12:24:15 -0800 Subject: [PATCH 3/6] Do not hold lock while sending requests, and some minor refactoring. --- .../storage/v1beta2/StreamWriterV2.java | 101 +++++++++--------- 1 file changed, 49 insertions(+), 52 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 5b58dccae8..10ed99d137 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import java.time.Duration; import java.util.Deque; import java.util.LinkedList; import java.util.concurrent.TimeUnit; @@ -53,8 +52,6 @@ public class StreamWriterV2 implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName()); - private static final Duration DONE_CALLBACK_WAIT_TIMEOUT = Duration.ofMinutes(10); - private Lock lock; private Condition hasMessageInWaitingQueue; @@ -104,23 +101,28 @@ private StreamWriterV2(Builder builder) { this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.streamConnection = - new StreamConnection(builder.client, new RequestCallback() { - @Override - public void run(AppendRowsResponse response) { - requestCallback(response); - } - }, new DoneCallback() { - @Override - public void run(Throwable finalStatus) { - doneCallback(finalStatus); - } - }); - this.appendThread = new Thread(new Runnable() { - @Override - public void run() { - appendLoop(); - } - }); + new StreamConnection( + builder.client, + new RequestCallback() { + @Override + public void run(AppendRowsResponse response) { + requestCallback(response); + } + }, + new DoneCallback() { + @Override + public void run(Throwable finalStatus) { + doneCallback(finalStatus); + } + }); + this.appendThread = + new Thread( + new Runnable() { + @Override + public void run() { + appendLoop(); + } + }); this.appendThread.start(); } @@ -210,10 +212,16 @@ private void appendLoop() { try { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); while (!this.waitingRequestQueue.isEmpty()) { - localQueue.addLast(this.waitingRequestQueue.pollFirst()); + AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + localQueue.addLast(requestWrapper); } } catch (InterruptedException e) { - log.warning("Interrupted while waiting for message. Error: " + e.toString()); + log.warning( + "Interrupted while waiting for message. Stream: " + + streamName + + " Error: " + + e.toString()); } finally { this.lock.unlock(); } @@ -223,16 +231,8 @@ private void appendLoop() { } // TODO: Add reconnection here. - - this.lock.lock(); - try { - while (!localQueue.isEmpty()) { - AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); - this.inflightRequestQueue.addLast(requestWrapper); - this.streamConnection.send(requestWrapper.message); - } - } finally { - this.lock.unlock(); + while (!localQueue.isEmpty()) { + this.streamConnection.send(localQueue.pollFirst().message); } } @@ -240,26 +240,7 @@ private void appendLoop() { // At this point, the waiting queue is drained, so no more requests. // We can close the stream connection and handle the remaining inflight requests. this.streamConnection.close(); - - log.info("Waiting for done callback from stream connection. Stream: " + streamName); - long waitDeadlineMs = System.currentTimeMillis() + DONE_CALLBACK_WAIT_TIMEOUT.toMillis(); - while (true) { - if (System.currentTimeMillis() > waitDeadlineMs) { - log.warning( - "Timeout waiting for done wallback. Skip inflight cleanup. Stream: " + streamName); - return; - } - this.lock.lock(); - try { - if (connectionFinalStatus != null) { - // Done callback is received, break. - break; - } - } finally { - this.lock.unlock(); - } - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } + waitForDoneCallback(); // At this point, there cannot be more callback. It is safe to clean up all inflight requests. log.info( @@ -284,6 +265,22 @@ private boolean waitingQueueDrained() { } } + private void waitForDoneCallback() { + log.info("Waiting for done callback from stream connection. Stream: " + streamName); + while (true) { + this.lock.lock(); + try { + if (connectionFinalStatus != null) { + // Done callback is received, return. + return; + } + } finally { + this.lock.unlock(); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList(); From c0318c97eccb1aa250bd822be9bed95cc80bd273 Mon Sep 17 00:00:00 2001 From: yayi Date: Mon, 22 Feb 2021 18:43:33 -0800 Subject: [PATCH 4/6] Support inflight control with blocking behavior as desired by dataflow sink. --- .../storage/v1beta2/StreamWriterV2.java | 83 ++++++++++++++++++- .../storage/v1beta2/StreamWriterV2Test.java | 77 ++++++++++++++++- 2 files changed, 155 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 10ed99d137..4dad1064c9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.util.Deque; import java.util.LinkedList; @@ -41,8 +42,6 @@ * *

TODO: Add max size check. * - *

TODO: Add inflight control. - * *

TODO: Attach traceId. * *

TODO: Support batching. @@ -54,12 +53,35 @@ public class StreamWriterV2 implements AutoCloseable { private Lock lock; private Condition hasMessageInWaitingQueue; + private Condition inflightReduced; /* * The identifier of stream to write to. */ private final String streamName; + /* + * Max allowed inflight requests in the stream. Method append is blocked at this. + */ + private final long maxInflightRequests; + + /* + * Max allowed inflight bytes in the stream. Method append is blocked at this. + */ + private final long maxInflightBytes; + + /* + * Tracks current inflight requests in the stream. + */ + @GuardedBy("lock") + private long inflightRequests = 0; + + /* + * Tracks current inflight bytes in the stream. + */ + @GuardedBy("lock") + private long inflightBytes = 0; + /* * Indicates whether user has called Close() or not. */ @@ -97,7 +119,10 @@ public class StreamWriterV2 implements AutoCloseable { private StreamWriterV2(Builder builder) { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); + this.inflightReduced = lock.newCondition(); this.streamName = builder.streamName; + this.maxInflightRequests = builder.maxInflightRequest; + this.maxInflightBytes = builder.maxInflightBytes; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.streamConnection = @@ -171,8 +196,29 @@ public ApiFuture append(AppendRowsRequest message) { "Stream is closed due to " + connectionFinalStatus.toString()))); return requestWrapper.appendResult; } + + ++this.inflightRequests; + this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); hasMessageInWaitingQueue.signal(); + + // Maybe block until we are below inflight limit. + while (this.inflightRequests >= this.maxInflightRequests + || this.inflightBytes >= this.maxInflightBytes) { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); + } + } return requestWrapper.appendResult; } finally { this.lock.unlock(); @@ -288,7 +334,7 @@ private void cleanupInflightRequests() { try { finalStatus = this.connectionFinalStatus; while (!this.inflightRequestQueue.isEmpty()) { - localQueue.addLast(this.inflightRequestQueue.pollFirst()); + localQueue.addLast(pollInflightRequestQueue()); } } finally { this.lock.unlock(); @@ -307,7 +353,7 @@ private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); try { - requestWrapper = this.inflightRequestQueue.pollFirst(); + requestWrapper = pollInflightRequestQueue(); } finally { this.lock.unlock(); } @@ -328,6 +374,15 @@ private void doneCallback(Throwable finalStatus) { } } + @GuardedBy("lock") + private AppendRequestAndResponse pollInflightRequestQueue() { + AppendRequestAndResponse requestWrapper = this.inflightRequestQueue.pollFirst(); + --this.inflightRequests; + this.inflightBytes -= requestWrapper.messageSize; + this.inflightReduced.signal(); + return requestWrapper; + } + /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWriteClient client) { return new StreamWriterV2.Builder(streamName, client); @@ -336,15 +391,33 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite /** A builder of {@link StreamWriterV2}s. */ public static final class Builder { + private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; + + private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb. + private String streamName; private BigQueryWriteClient client; + private long maxInflightRequest = DEFAULT_MAX_INFLIGHT_REQUESTS; + + private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES; + private Builder(String streamName, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(streamName); this.client = Preconditions.checkNotNull(client); } + public Builder setMaxInflightRequests(long value) { + this.maxInflightRequest = value; + return this; + } + + public Builder setMaxInflightBytes(long value) { + this.maxInflightBytes = value; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriterV2 build() { return new StreamWriterV2(this); @@ -355,10 +428,12 @@ public StreamWriterV2 build() { private static final class AppendRequestAndResponse { final SettableApiFuture appendResult; final AppendRowsRequest message; + final long messageSize; AppendRequestAndResponse(AppendRowsRequest message) { this.appendResult = SettableApiFuture.create(); this.message = message; + this.messageSize = message.getProtoRows().getSerializedSize(); } } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index c50e5abb70..53891272b8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -141,6 +141,22 @@ public void run() throws Throwable { }); } + private void verifyAppendIsBlocked(final StreamWriterV2 writer) throws Exception { + Thread appendThread = + new Thread( + new Runnable() { + @Override + public void run() { + sendTestMessage(writer, new String[] {"A"}); + } + }); + // Start a separate thread to append and verify that it is still alive after 2 seoncds. + appendThread.start(); + TimeUnit.SECONDS.sleep(2); + assertTrue(appendThread.isAlive()); + appendThread.interrupt(); + } + @Test public void testAppendSuccess() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); @@ -288,6 +304,65 @@ public void serverCloseWhileRequestsInflight() throws Exception { } writer.close(); - ; + } + + @Test + public void testZeroMaxInflightRequests() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(0).build(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + verifyAppendIsBlocked(writer); + writer.close(); + } + + @Test + public void testZeroMaxInflightBytes() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(0).build(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + verifyAppendIsBlocked(writer); + writer.close(); + } + + @Test + public void testOneMaxInflightRequests() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightRequests(1).build(); + // Server will sleep 1 second before every response. + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + long appendStartTimeMs = System.currentTimeMillis(); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs; + assertTrue(appendElapsedMs >= 1000); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + writer.close(); + } + + @Test + public void testAppendsWithTinyMaxInflightBytes() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM, client).setMaxInflightBytes(1).build(); + // Server will sleep 100ms before every response. + testBigQueryWrite.setResponseSleep(Duration.ofMillis(100)); + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + List> futures = new ArrayList<>(); + long appendStartTimeMs = System.currentTimeMillis(); + for (int i = 0; i < appendCount; i++) { + futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + } + long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs; + assertTrue(appendElapsedMs >= 1000); + + for (int i = 0; i < appendCount; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + writer.close(); } } From 70cd20cfb49b176b221144be7ba2fce2a49ef99c Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 13:36:54 -0800 Subject: [PATCH 5/6] Verify request ordering in unit test --- .../cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 53891272b8..9b1804a266 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -104,7 +104,7 @@ private AppendRowsRequest createAppendRequest(String[] messages, long offset) { FooType foo = FooType.newBuilder().setFoo(message).build(); rows.addSerializedRows(foo.toByteString()); } - if (offset > 0) { + if (offset > -1) { requestBuilder.setOffset(Int64Value.of(offset)); } return requestBuilder @@ -354,7 +354,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception { List> futures = new ArrayList<>(); long appendStartTimeMs = System.currentTimeMillis(); for (int i = 0; i < appendCount; i++) { - futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + futures.add(writer.append(createAppendRequest(new String[] {String.valueOf(i)}, i))); } long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs; assertTrue(appendElapsedMs >= 1000); @@ -363,6 +363,9 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception { assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); } assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + for (int i = 0; i < appendCount; i++) { + assertEquals(i, testBigQueryWrite.getAppendRequests().get(i).getOffset().getValue()); + } writer.close(); } } From 74bb949ea8e4135acce8d6920c6ff74d7209bd71 Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 14:01:41 -0800 Subject: [PATCH 6/6] Move inflight quota wait to separate method. --- .../storage/v1beta2/StreamWriterV2.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 198088e1e4..1d001d5bbb 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -215,30 +215,33 @@ public ApiFuture append(AppendRowsRequest message) { this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); hasMessageInWaitingQueue.signal(); - - // Maybe block until we are below inflight limit. - while (this.inflightRequests >= this.maxInflightRequests - || this.inflightBytes >= this.maxInflightBytes) { - try { - inflightReduced.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for inflight quota. Stream: " - + streamName - + " Error: " - + e.toString()); - throw new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withCause(e) - .withDescription("Interrupted while waiting for quota.")); - } - } + maybeWaitForInflightQuota(); return requestWrapper.appendResult; } finally { this.lock.unlock(); } } + @GuardedBy("lock") + private void maybeWaitForInflightQuota() { + while (this.inflightRequests >= this.maxInflightRequests + || this.inflightBytes >= this.maxInflightBytes) { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); + } + } + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() {