Skip to content

Commit

Permalink
fix: fix deadlock issue in ConnectionWorkerPool (#1938)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Jan 20, 2023
1 parent 23ec7fa commit caf1e76
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ private void requestCallback(AppendRowsResponse response) {
if (response.hasError()) {
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
log.fine(String.format("Got error message: %s", response.toString()));
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else if (response.getRowErrorsCount() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +49,7 @@ public class ConnectionWorkerPool {

private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName());
/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
* Max allowed inflight requests in the stream.getInflightWaitSeconds Method append is blocked at this.
*/
private final long maxInflightRequests;

Expand All @@ -68,12 +69,10 @@ public class ConnectionWorkerPool {
private final FlowController.LimitExceededBehavior limitExceededBehavior;

/** Map from write stream to corresponding connection. */
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection =
new ConcurrentHashMap<>();
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection = new HashMap<>();

/** Map from a connection to a set of write stream that have sent requests onto it. */
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream =
new ConcurrentHashMap<>();
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream = new HashMap<>();

/** Collection of all the created connections. */
private final Set<ConnectionWorker> connectionWorkerPool =
Expand Down Expand Up @@ -227,14 +226,13 @@ public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset) {
// We are in multiplexing mode after entering the following logic.
ConnectionWorker connectionWorker =
streamWriterToConnection.compute(
streamWriter,
(key, existingStream) -> {
// Though compute on concurrent map is atomic, we still do explicit locking as we
// may have concurrent close(...) triggered.
lock.lock();
try {
ConnectionWorker connectionWorker;
lock.lock();
try {
connectionWorker =
streamWriterToConnection.compute(
streamWriter,
(key, existingStream) -> {
// Stick to the existing stream if it's not overwhelmed.
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
return existingStream;
Expand All @@ -252,10 +250,10 @@ public ApiFuture<AppendRowsResponse> append(
createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>());
connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter);
return createdOrExistingConnection;
} finally {
lock.unlock();
}
});
});
} finally {
lock.unlock();
}
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
Expand All @@ -35,6 +38,8 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -314,6 +319,46 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0);
}

@Test
public void testCloseWhileAppending_noDeadlockHappen() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(
/*maxRequests=*/ 1500, /*maxBytes=*/ 100000, java.time.Duration.ofSeconds(5));

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(20L));
StreamWriter writeStream1 = getTestStreamWriter(TEST_STREAM_1);

ListeningExecutorService threadPool =
MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("AsyncStreamReadThread")
.build()));

long appendCount = 10;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<Future<?>> futures = new ArrayList<>();

for (int i = 0; i < 500; i++) {
futures.add(
threadPool.submit(
() -> {
sendFooStringTestMessage(
writeStream1, connectionWorkerPool, new String[] {String.valueOf(0)}, 0);
}));
}
connectionWorkerPool.close(writeStream1);
for (int i = 0; i < 500; i++) {
futures.get(i).get();
}
}

@Test
public void testToTableName() {
assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s"))
Expand Down

0 comments on commit caf1e76

Please sign in to comment.