From b365503441ef81f8fe0a97b22bd0e3796ef6d2c4 Mon Sep 17 00:00:00 2001 From: yayi Date: Sat, 20 Feb 2021 17:35:16 -0800 Subject: [PATCH 1/3] Add a new StreamWriterV2. Compared to existing StreamWriter, its locking mechanism is much simpler. --- google-cloud-bigquerystorage/pom.xml | 8 + .../storage/v1beta2/StreamConnection.java | 104 ++++++ .../storage/v1beta2/StreamWriterV2.java | 352 ++++++++++++++++++ .../storage/v1beta2/FakeBigQueryWrite.java | 4 + .../v1beta2/FakeBigQueryWriteImpl.java | 14 + .../storage/v1beta2/StreamWriterV2Test.java | 283 ++++++++++++++ 6 files changed, 765 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 28d5248fa2..b3bc46815c 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -44,6 +44,14 @@ org.codehaus.mojo flatten-maven-plugin + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java new file mode 100644 index 0000000000..c04f43c944 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1beta2; + +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; + +/** + * StreamConnection is responsible for writing requests to a GRPC bidirecional connection. + * + *

StreamWriter creates a connection. Two callback functions are necessary: request_callback and + * done_callback. Request callback is used for every request, and done callback is used to notify + * the user that the connection is closed and no more callbacks will be received from this + * connection. + * + *

The stream writer will accept all the requests without flow control, and makes the callbacks + * in receiving order. + * + *

It's user's responsibility to do the flow control and maintain the lifetime of the requests. + */ +public class StreamConnection { + private BidiStreamingCallable bidiStreamingCallable; + private ClientStream clientStream; + + private RequestCallback requestCallback; + private DoneCallback doneCallback; + + public StreamConnection( + BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) { + this.requestCallback = requestCallback; + this.doneCallback = doneCallback; + + bidiStreamingCallable = client.appendRowsCallable(); + clientStream = + bidiStreamingCallable.splitCall( + new ResponseObserver() { + + @Override + public void onStart(StreamController controller) { + // no-op + } + + @Override + public void onResponse(AppendRowsResponse response) { + StreamConnection.this.requestCallback.run(response); + } + + @Override + public void onError(Throwable t) { + StreamConnection.this.doneCallback.run(t); + } + + @Override + public void onComplete() { + StreamConnection.this.doneCallback.run( + new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withDescription("Stream is closed by user."))); + } + }); + } + + /** + * Sends a request to the bi-directional stream connection. + * + * @param request request to send. + */ + public void send(AppendRowsRequest request) { + clientStream.send(request); + } + + /** Close the bi-directional stream connection. */ + public void close() { + clientStream.closeSend(); + } + + /** Invoked when a response is received from the server. */ + public static interface RequestCallback { + public void run(AppendRowsResponse response); + } + + /** Invoked when server closes the connection. */ + public static interface DoneCallback { + public void run(Throwable finalStatus); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java new file mode 100644 index 0000000000..0a7c158c64 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -0,0 +1,352 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1beta2; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.time.Duration; +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; +import javax.annotation.concurrent.GuardedBy; + +/** + * A BigQuery Stream Writer that can be used to write data into BigQuery Table. + * + *

TODO: Add credential support. + * + *

TODO: Attach schema. + * + *

TODO: Add max size check. + * + *

TODO: Add inflight control. + * + *

TODO: Attach traceId. + * + *

TODO: Support batching. + * + *

TODO: Support schema change. + */ +public class StreamWriterV2 implements AutoCloseable { + private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName()); + + private static final Duration DONE_CALLBACK_WAIT_TIMEOUT = Duration.ofMinutes(10); + + private Lock lock; + private Condition hasMessageInWaitingQueue; + + /* + * The identifier of stream to write to. + */ + private final String streamName; + + /* + * Indicates whether user has called Close() or not. + */ + @GuardedBy("lock") + private boolean userClosed = false; + + /* + * The final status of connection. Set to nonnull when connection is permanently closed. + */ + @GuardedBy("lock") + private Throwable connectionFinalStatus = null; + + /* + * Contains requests buffered in the client and not yet sent to server. + */ + @GuardedBy("lock") + private final Deque waitingRequestQueue; + + /* + * Contains sent append requests waiting for response from server. + */ + @GuardedBy("lock") + private final Deque inflightRequestQueue; + + /* + * Wraps the underlying bi-directional stream connection with server. + */ + private StreamConnection streamConnection; + + /* + * A separate thread to handle actual communication with server. + */ + private Thread appendThread; + + private StreamWriterV2(Builder builder) { + this.lock = new ReentrantLock(); + this.hasMessageInWaitingQueue = lock.newCondition(); + this.streamName = builder.streamName; + this.waitingRequestQueue = new LinkedList(); + this.inflightRequestQueue = new LinkedList(); + this.streamConnection = + new StreamConnection(builder.client, this::requestCallback, this::doneCallback); + this.appendThread = new Thread(this::appendLoop); + this.appendThread.start(); + } + + /** + * Schedules the writing of a message. + * + *

Example of writing a message. + * + *

{@code
+   * AppendRowsRequest message;
+   * ApiFuture messageIdFuture = writer.append(message);
+   * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+   *   public void onSuccess(AppendRowsResponse response) {
+   *     if (response.hasOffset()) {
+   *       System.out.println("written with offset: " + response.getOffset());
+   *     } else {
+   *       System.out.println("received an in stream error: " + response.error().toString());
+   *     }
+   *   }
+   *
+   *   public void onFailure(Throwable t) {
+   *     System.out.println("failed to write: " + t);
+   *   }
+   * }, MoreExecutors.directExecutor());
+   * }
+ * + * @param message the message in serialized format to write to BigQuery. + * @return the message ID wrapped in a future. + */ + public ApiFuture append(AppendRowsRequest message) { + AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); + this.lock.lock(); + try { + if (userClosed) { + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription("Stream is already closed"))); + return requestWrapper.appendResult; + } + if (connectionFinalStatus != null) { + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription( + "Stream is closed due to " + connectionFinalStatus.toString()))); + return requestWrapper.appendResult; + } + waitingRequestQueue.addLast(requestWrapper); + hasMessageInWaitingQueue.signal(); + return requestWrapper.appendResult; + } finally { + this.lock.unlock(); + } + } + + /** Close the stream writer. Shut down all resources. */ + @Override + public void close() { + log.info("User closing stream: " + streamName); + this.lock.lock(); + try { + this.userClosed = true; + } finally { + this.lock.unlock(); + } + log.info("Waiting for append thread to finish. Stream: " + streamName); + try { + appendThread.join(); + log.info("User close complete. Stream: " + streamName); + } catch (InterruptedException e) { + // Unexpected. Just swallow the exception with logging. + log.warning( + "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); + } + } + + /* + * This loop is executed in a separate thread. + * + * It takes requests from waiting queue and sends them to server. + */ + private void appendLoop() { + Deque localQueue = new LinkedList(); + while (!waitingQueueDrained()) { + this.lock.lock(); + try { + hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); + while (!this.waitingRequestQueue.isEmpty()) { + localQueue.addLast(this.waitingRequestQueue.pollFirst()); + } + } catch (InterruptedException e) { + log.warning("Interrupted while waiting for message. Error: " + e.toString()); + } finally { + this.lock.unlock(); + } + + if (localQueue.isEmpty()) { + continue; + } + + // TODO: Add reconnection here. + + this.lock.lock(); + try { + while (!localQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + this.streamConnection.send(requestWrapper.message); + } + } finally { + this.lock.unlock(); + } + } + + log.info("Cleanup starts. Stream: " + streamName); + // At this point, the waiting queue is drained, so no more requests. + // We can close the stream connection and handle the remaining inflight requests. + this.streamConnection.close(); + + log.info("Waiting for done callback from stream connection. Stream: " + streamName); + long waitDeadlineMs = System.currentTimeMillis() + DONE_CALLBACK_WAIT_TIMEOUT.toMillis(); + while (true) { + if (System.currentTimeMillis() > waitDeadlineMs) { + log.warning( + "Timeout waiting for done wallback. Skip inflight cleanup. Stream: " + streamName); + return; + } + this.lock.lock(); + try { + if (connectionFinalStatus != null) { + // Done callback is received, break. + break; + } + } finally { + this.lock.unlock(); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + // At this point, there cannot be more callback. It is safe to clean up all inflight requests. + log.info( + "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName); + cleanupInflightRequests(); + log.info("Append thread is done. Stream: " + streamName); + } + + /* + * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue. + * + * It serves as a signal to append thread that there cannot be any more requests in the waiting + * queue and it can prepare to stop. + */ + private boolean waitingQueueDrained() { + this.lock.lock(); + try { + return (this.userClosed || this.connectionFinalStatus != null) + && this.waitingRequestQueue.isEmpty(); + } finally { + this.lock.unlock(); + } + } + + private void cleanupInflightRequests() { + Throwable finalStatus; + Deque localQueue = new LinkedList(); + this.lock.lock(); + try { + finalStatus = this.connectionFinalStatus; + while (!this.inflightRequestQueue.isEmpty()) { + localQueue.addLast(this.inflightRequestQueue.pollFirst()); + } + } finally { + this.lock.unlock(); + } + log.info( + "Cleaning " + + localQueue.size() + + " inflight requests with error: " + + finalStatus.toString()); + while (!localQueue.isEmpty()) { + localQueue.pollFirst().appendResult.setException(finalStatus); + } + } + + private void requestCallback(AppendRowsResponse response) { + AppendRequestAndResponse requestWrapper; + this.lock.lock(); + try { + requestWrapper = this.inflightRequestQueue.pollFirst(); + } finally { + this.lock.unlock(); + } + requestWrapper.appendResult.set(response); + } + + private void doneCallback(Throwable finalStatus) { + log.info( + "Received done callback. Stream: " + + streamName + + " Final status: " + + finalStatus.toString()); + this.lock.lock(); + try { + this.connectionFinalStatus = finalStatus; + } finally { + this.lock.unlock(); + } + } + + /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */ + public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWriteClient client) { + return new StreamWriterV2.Builder(streamName, client); + } + + /** A builder of {@link StreamWriterV2}s. */ + public static final class Builder { + + private String streamName; + + private BigQueryWriteClient client; + + private Builder(String streamName, BigQueryWriteClient client) { + this.streamName = Preconditions.checkNotNull(streamName); + ; + this.client = Preconditions.checkNotNull(client); + ; + } + + /** Builds the {@code StreamWriterV2}. */ + public StreamWriterV2 build() { + return new StreamWriterV2(this); + } + } + + // Class that wraps AppendRowsRequest and its corresponding Response future. + private static final class AppendRequestAndResponse { + final SettableApiFuture appendResult; + final AppendRowsRequest message; + + AppendRequestAndResponse(AppendRowsRequest message) { + this.appendResult = SettableApiFuture.create(); + this.message = message; + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java index 618366cfdc..a333260529 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java @@ -79,6 +79,10 @@ public void setResponseDelay(Duration delay) { serviceImpl.setResponseDelay(delay); } + public void setResponseSleep(Duration sleep) { + serviceImpl.setResponseSleep(sleep); + } + public void setExecutor(ScheduledExecutorService executor) { serviceImpl.setExecutor(executor); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java index 7cef4f7483..b99dab99bd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1beta2; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; @@ -44,6 +45,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private boolean autoPublishResponse; private ScheduledExecutorService executor = null; private Duration responseDelay = Duration.ZERO; + private Duration responseSleep = Duration.ZERO; /** Class used to save the state of a possible response. */ private static class Response { @@ -121,10 +123,16 @@ public void onNext(AppendRowsRequest value) { LOG.info("Get request:" + value.toString()); final Response response = responses.remove(); requests.add(value); + if (responseSleep.compareTo(Duration.ZERO) > 0) { + LOG.info("Sleeping before response for " + responseSleep.toString()); + Uninterruptibles.sleepUninterruptibly( + responseSleep.toMillis(), TimeUnit.MILLISECONDS); + } if (responseDelay == Duration.ZERO) { sendResponse(response, responseObserver); } else { final Response responseToSend = response; + // TODO(yirutang): This is very wrong because it messes up response/complete ordering. LOG.info("Schedule a response to be sent at delay"); executor.schedule( new Runnable() { @@ -173,6 +181,12 @@ public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) { return this; } + /** Set an amount of time by which to sleep before publishing responses. */ + public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) { + this.responseSleep = responseSleep; + return this; + } + public FakeBigQueryWriteImpl addResponse(AppendRowsResponse appendRowsResponse) { responses.add(new Response(appendRowsResponse)); return this; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java new file mode 100644 index 0000000000..39f680b374 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -0,0 +1,283 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1beta2; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Int64Value; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class StreamWriterV2Test { + private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName()); + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private FakeScheduledExecutorService fakeExecutor; + private FakeBigQueryWrite testBigQueryWrite; + private static MockServiceHelper serviceHelper; + private BigQueryWriteClient client; + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + client = + BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + } + + @After + public void tearDown() throws Exception { + log.info("tearDown called"); + client.close(); + serviceHelper.stop(); + } + + private StreamWriterV2 getTestStreamWriterV2() { + return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); + } + + private AppendRowsRequest createAppendRequest(String[] messages, long offset) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema( + ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("Message") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("foo") + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setNumber(1) + .build()) + .build())); + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + if (offset > 0) { + requestBuilder.setOffset(Int64Value.of(offset)); + } + return requestBuilder + .setProtoRows(dataBuilder.setRows(rows.build()).build()) + .setWriteStream(TEST_STREAM) + .build(); + } + + private AppendRowsResponse createAppendResponse(long offset) { + return AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build()) + .build(); + } + + private ApiFuture sendTestMessage(StreamWriterV2 writer, String[] messages) { + return writer.append(createAppendRequest(messages, -1)); + } + + private static T assertFutureException( + Class expectedThrowable, Future future) { + return assertThrows( + expectedThrowable, + () -> { + try { + future.get(); + } catch (ExecutionException ex) { + // Future wraps exception with ExecutionException. So unwrapper it here. + throw ex.getCause(); + } + }); + } + + @Test + public void testAppendSuccess() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + + long appendCount = 1000; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + } + + for (int i = 0; i < appendCount; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size()); + + writer.close(); + } + + @Test + public void testAppendSuccessAndError() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + ApiException actualError = assertFutureException(ApiException.class, appendFuture2); + assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode()); + + writer.close(); + } + + @Test + public void longIdleBetweenAppends() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse(createAppendResponse(1)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + + // Sleep to create a long idle between appends. + TimeUnit.SECONDS.sleep(3); + + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); + + writer.close(); + } + + @Test + public void testAppendAfterUserClose() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + writer.close(); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertTrue(appendFuture2.isDone()); + StatusRuntimeException actualError = + assertFutureException(StatusRuntimeException.class, appendFuture2); + assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode()); + } + + @Test + public void testAppendAfterServerClose() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiException error1 = assertFutureException(ApiException.class, appendFuture1); + assertEquals(Code.INTERNAL, error1.getStatusCode().getCode()); + + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + assertTrue(appendFuture2.isDone()); + StatusRuntimeException error2 = + assertFutureException(StatusRuntimeException.class, appendFuture2); + assertEquals(Status.Code.FAILED_PRECONDITION, error2.getStatus().getCode()); + + writer.close(); + } + + @Test + public void userCloseWhileRequestInflight() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + // Server will sleep 2 seconds before sending back the response. + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + // Send a request and close the stream in separate thread while the request is inflight. + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + Thread closeThread = + new Thread( + () -> { + writer.close(); + }); + closeThread.start(); + + // Due to the sleep on server, the append won't finish within 1 second even though stream + // is being closed. + assertThrows( + TimeoutException.class, + () -> { + appendFuture1.get(1, TimeUnit.SECONDS); + }); + + // Within 2 seconds, the request should be done and stream should be closed. + closeThread.join(2000); + assertTrue(appendFuture1.isDone()); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + } + + @Test + public void serverCloseWhileRequestsInflight() throws Exception { + StreamWriterV2 writer = getTestStreamWriterV2(); + // Server will sleep 2 seconds before closing the connection. + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + + // Send 10 requests, so that there are 10 inflight requests. + int appendCount = 10; + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)})); + } + + // Server close should properly handle all inflight requests. + for (int i = 0; i < appendCount; i++) { + ApiException actualError = assertFutureException(ApiException.class, futures.get(i)); + assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode()); + } + + writer.close(); + ; + } +} From 33124023922020abeeca875333906037bec2f7f4 Mon Sep 17 00:00:00 2001 From: yayi Date: Mon, 22 Feb 2021 12:04:51 -0800 Subject: [PATCH 2/3] Stop using Java8 features as we still need to support Java7 --- google-cloud-bigquerystorage/pom.xml | 8 ----- .../storage/v1beta2/StreamWriterV2.java | 23 +++++++++--- .../storage/v1beta2/StreamWriterV2Test.java | 36 ++++++++++++------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index b3bc46815c..28d5248fa2 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -44,14 +44,6 @@ org.codehaus.mojo flatten-maven-plugin - - org.apache.maven.plugins - maven-compiler-plugin - - 8 - 8 - - diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 0a7c158c64..5b58dccae8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -17,6 +17,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; +import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; @@ -102,8 +104,23 @@ private StreamWriterV2(Builder builder) { this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.streamConnection = - new StreamConnection(builder.client, this::requestCallback, this::doneCallback); - this.appendThread = new Thread(this::appendLoop); + new StreamConnection(builder.client, new RequestCallback() { + @Override + public void run(AppendRowsResponse response) { + requestCallback(response); + } + }, new DoneCallback() { + @Override + public void run(Throwable finalStatus) { + doneCallback(finalStatus); + } + }); + this.appendThread = new Thread(new Runnable() { + @Override + public void run() { + appendLoop(); + } + }); this.appendThread.start(); } @@ -328,9 +345,7 @@ public static final class Builder { private Builder(String streamName, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(streamName); - ; this.client = Preconditions.checkNotNull(client); - ; } /** Builds the {@code StreamWriterV2}. */ diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 39f680b374..c50e5abb70 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -42,6 +42,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; @@ -124,15 +125,18 @@ private ApiFuture sendTestMessage(StreamWriterV2 writer, Str } private static T assertFutureException( - Class expectedThrowable, Future future) { + Class expectedThrowable, final Future future) { return assertThrows( expectedThrowable, - () -> { - try { - future.get(); - } catch (ExecutionException ex) { - // Future wraps exception with ExecutionException. So unwrapper it here. - throw ex.getCause(); + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + try { + future.get(); + } catch (ExecutionException ex) { + // Future wraps exception with ExecutionException. So unwrapper it here. + throw ex.getCause(); + } } }); } @@ -229,17 +233,20 @@ public void testAppendAfterServerClose() throws Exception { @Test public void userCloseWhileRequestInflight() throws Exception { - StreamWriterV2 writer = getTestStreamWriterV2(); + final StreamWriterV2 writer = getTestStreamWriterV2(); // Server will sleep 2 seconds before sending back the response. testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2)); testBigQueryWrite.addResponse(createAppendResponse(0)); // Send a request and close the stream in separate thread while the request is inflight. - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + final ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); Thread closeThread = new Thread( - () -> { - writer.close(); + new Runnable() { + @Override + public void run() { + writer.close(); + } }); closeThread.start(); @@ -247,8 +254,11 @@ public void userCloseWhileRequestInflight() throws Exception { // is being closed. assertThrows( TimeoutException.class, - () -> { - appendFuture1.get(1, TimeUnit.SECONDS); + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + appendFuture1.get(1, TimeUnit.SECONDS); + } }); // Within 2 seconds, the request should be done and stream should be closed. From b98d6be0a648aab4aed280142508389d89a03dce Mon Sep 17 00:00:00 2001 From: yayi Date: Mon, 22 Feb 2021 12:24:15 -0800 Subject: [PATCH 3/3] Do not hold lock while sending requests, and some minor refactoring. --- .../storage/v1beta2/StreamWriterV2.java | 101 +++++++++--------- 1 file changed, 49 insertions(+), 52 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 5b58dccae8..10ed99d137 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import java.time.Duration; import java.util.Deque; import java.util.LinkedList; import java.util.concurrent.TimeUnit; @@ -53,8 +52,6 @@ public class StreamWriterV2 implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName()); - private static final Duration DONE_CALLBACK_WAIT_TIMEOUT = Duration.ofMinutes(10); - private Lock lock; private Condition hasMessageInWaitingQueue; @@ -104,23 +101,28 @@ private StreamWriterV2(Builder builder) { this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); this.streamConnection = - new StreamConnection(builder.client, new RequestCallback() { - @Override - public void run(AppendRowsResponse response) { - requestCallback(response); - } - }, new DoneCallback() { - @Override - public void run(Throwable finalStatus) { - doneCallback(finalStatus); - } - }); - this.appendThread = new Thread(new Runnable() { - @Override - public void run() { - appendLoop(); - } - }); + new StreamConnection( + builder.client, + new RequestCallback() { + @Override + public void run(AppendRowsResponse response) { + requestCallback(response); + } + }, + new DoneCallback() { + @Override + public void run(Throwable finalStatus) { + doneCallback(finalStatus); + } + }); + this.appendThread = + new Thread( + new Runnable() { + @Override + public void run() { + appendLoop(); + } + }); this.appendThread.start(); } @@ -210,10 +212,16 @@ private void appendLoop() { try { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); while (!this.waitingRequestQueue.isEmpty()) { - localQueue.addLast(this.waitingRequestQueue.pollFirst()); + AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + localQueue.addLast(requestWrapper); } } catch (InterruptedException e) { - log.warning("Interrupted while waiting for message. Error: " + e.toString()); + log.warning( + "Interrupted while waiting for message. Stream: " + + streamName + + " Error: " + + e.toString()); } finally { this.lock.unlock(); } @@ -223,16 +231,8 @@ private void appendLoop() { } // TODO: Add reconnection here. - - this.lock.lock(); - try { - while (!localQueue.isEmpty()) { - AppendRequestAndResponse requestWrapper = localQueue.pollFirst(); - this.inflightRequestQueue.addLast(requestWrapper); - this.streamConnection.send(requestWrapper.message); - } - } finally { - this.lock.unlock(); + while (!localQueue.isEmpty()) { + this.streamConnection.send(localQueue.pollFirst().message); } } @@ -240,26 +240,7 @@ private void appendLoop() { // At this point, the waiting queue is drained, so no more requests. // We can close the stream connection and handle the remaining inflight requests. this.streamConnection.close(); - - log.info("Waiting for done callback from stream connection. Stream: " + streamName); - long waitDeadlineMs = System.currentTimeMillis() + DONE_CALLBACK_WAIT_TIMEOUT.toMillis(); - while (true) { - if (System.currentTimeMillis() > waitDeadlineMs) { - log.warning( - "Timeout waiting for done wallback. Skip inflight cleanup. Stream: " + streamName); - return; - } - this.lock.lock(); - try { - if (connectionFinalStatus != null) { - // Done callback is received, break. - break; - } - } finally { - this.lock.unlock(); - } - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } + waitForDoneCallback(); // At this point, there cannot be more callback. It is safe to clean up all inflight requests. log.info( @@ -284,6 +265,22 @@ private boolean waitingQueueDrained() { } } + private void waitForDoneCallback() { + log.info("Waiting for done callback from stream connection. Stream: " + streamName); + while (true) { + this.lock.lock(); + try { + if (connectionFinalStatus != null) { + // Done callback is received, return. + return; + } + } finally { + this.lock.unlock(); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + private void cleanupInflightRequests() { Throwable finalStatus; Deque localQueue = new LinkedList();