From caf1e7603153b1b8de90d6294ac15c711076d8f4 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Fri, 20 Jan 2023 11:34:29 -0800 Subject: [PATCH] fix: fix deadlock issue in ConnectionWorkerPool (#1938) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../bigquery/storage/v1/ConnectionWorker.java | 1 + .../storage/v1/ConnectionWorkerPool.java | 32 +++++++------ .../storage/v1/ConnectionWorkerPoolTest.java | 45 +++++++++++++++++++ 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4a32f57239..4e17850511 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -642,6 +642,7 @@ private void requestCallback(AppendRowsResponse response) { if (response.hasError()) { Exceptions.StorageException storageException = Exceptions.toStorageException(response.getError(), null); + log.fine(String.format("Got error message: %s", response.toString())); if (storageException != null) { requestWrapper.appendResult.setException(storageException); } else if (response.getRowErrorsCount() > 0) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 40f21b72cb..7bcb358eea 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -48,7 +49,7 @@ public class ConnectionWorkerPool { private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName()); /* - * Max allowed inflight requests in the stream. Method append is blocked at this. + * Max allowed inflight requests in the stream.getInflightWaitSeconds Method append is blocked at this. */ private final long maxInflightRequests; @@ -68,12 +69,10 @@ public class ConnectionWorkerPool { private final FlowController.LimitExceededBehavior limitExceededBehavior; /** Map from write stream to corresponding connection. */ - private final Map streamWriterToConnection = - new ConcurrentHashMap<>(); + private final Map streamWriterToConnection = new HashMap<>(); /** Map from a connection to a set of write stream that have sent requests onto it. */ - private final Map> connectionToWriteStream = - new ConcurrentHashMap<>(); + private final Map> connectionToWriteStream = new HashMap<>(); /** Collection of all the created connections. */ private final Set connectionWorkerPool = @@ -227,14 +226,13 @@ public ApiFuture append(StreamWriter streamWriter, ProtoRows public ApiFuture append( StreamWriter streamWriter, ProtoRows rows, long offset) { // We are in multiplexing mode after entering the following logic. - ConnectionWorker connectionWorker = - streamWriterToConnection.compute( - streamWriter, - (key, existingStream) -> { - // Though compute on concurrent map is atomic, we still do explicit locking as we - // may have concurrent close(...) triggered. - lock.lock(); - try { + ConnectionWorker connectionWorker; + lock.lock(); + try { + connectionWorker = + streamWriterToConnection.compute( + streamWriter, + (key, existingStream) -> { // Stick to the existing stream if it's not overwhelmed. if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) { return existingStream; @@ -252,10 +250,10 @@ public ApiFuture append( createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>()); connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter); return createdOrExistingConnection; - } finally { - lock.unlock(); - } - }); + }); + } finally { + lock.unlock(); + } Stopwatch stopwatch = Stopwatch.createStarted(); ApiFuture responseFuture = connectionWorker.append( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 415c35329a..980772b2ff 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -25,6 +25,9 @@ import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; @@ -35,6 +38,8 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -314,6 +319,46 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception { assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0); } + @Test + public void testCloseWhileAppending_noDeadlockHappen() throws Exception { + ConnectionWorkerPool.setOptions( + Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build()); + ConnectionWorkerPool connectionWorkerPool = + createConnectionWorkerPool( + /*maxRequests=*/ 1500, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5)); + + // Sets the sleep time to simulate requests stuck in connection. + testBigQueryWrite.setResponseSleep(Duration.ofMillis(20L)); + StreamWriter writeStream1 = getTestStreamWriter(TEST_STREAM_1); + + ListeningExecutorService threadPool = + MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("AsyncStreamReadThread") + .build())); + + long appendCount = 10; + for (long i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + List> futures = new ArrayList<>(); + + for (int i = 0; i < 500; i++) { + futures.add( + threadPool.submit( + () -> { + sendFooStringTestMessage( + writeStream1, connectionWorkerPool, new String[] {String.valueOf(0)}, 0); + })); + } + connectionWorkerPool.close(writeStream1); + for (int i = 0; i < 500; i++) { + futures.get(i).get(); + } + } + @Test public void testToTableName() { assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s"))