Skip to content

Commit

Permalink
fix: remove unrecoverable connection from connection pool during mult…
Browse files Browse the repository at this point in the history
…iplexing (#1967)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

* feat: allow java client lib handle switch table schema for the same stream
name

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Jan 31, 2023
1 parent 305f71e commit 091dddb
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ public String getWriterId() {
return writerId;
}

boolean isConnectionInUnrecoverableState() {
// If final status is set, there's no
return connectionFinalStatus != null;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,17 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
streamWriter,
(key, existingStream) -> {
// Stick to the existing stream if it's not overwhelmed.
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
if (existingStream != null
&& !existingStream.getLoad().isOverwhelmed()
&& !existingStream.isConnectionInUnrecoverableState()) {
return existingStream;
}
if (existingStream != null && existingStream.isConnectionInUnrecoverableState()) {
existingStream = null;
}
// Before search for the next connection to attach, clear the finalized connections
// first so that they will not be selected.
clearFinalizedConnectionWorker();
// Try to create or find another existing stream to reuse.
ConnectionWorker createdOrExistingConnection = null;
try {
Expand Down Expand Up @@ -299,7 +307,6 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {

// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
return existingConnectionWorker;
Expand All @@ -310,6 +317,18 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
}

private void clearFinalizedConnectionWorker() {
Set<ConnectionWorker> connectionWorkerSet = new HashSet<>();
for (ConnectionWorker existingWorker : connectionWorkerPool) {
if (existingWorker.isConnectionInUnrecoverableState()) {
connectionWorkerSet.add(existingWorker);
}
}
for (ConnectionWorker workerToRemove : connectionWorkerSet) {
connectionWorkerPool.remove(workerToRemove);
}
}

/** Select out the best connection worker among the given connection workers. */
static ConnectionWorker pickBestLoadConnection(
Comparator<Load> comparator, List<ConnectionWorker> connectionWorkerList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -450,6 +451,15 @@ static void cleanUp() {
connectionPoolMap.clear();
}

@VisibleForTesting
ConnectionWorkerPool getTestOnlyConnectionWorkerPool() {
ConnectionWorkerPool connectionWorkerPool = null;
for (Entry<ConnectionPoolKey, ConnectionWorkerPool> entry : connectionPoolMap.entrySet()) {
connectionWorkerPool = entry.getValue();
}
return connectionWorkerPool;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
private static final String TEST_STREAM_3 = "projects/p/datasets/d3/tables/t3/streams/_default";
private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default";
private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
Expand Down Expand Up @@ -1090,6 +1091,115 @@ public void testExtractDatasetName() throws Exception {
Assert.assertTrue(ex.getMessage().contains("The passed in stream name does not match"));
}

@Test
public void testRetryInUnrecoverableStatus_MultiplexingCase() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(4).build());
ConnectionWorkerPool.enableTestingLogic();

// Setup: create three stream writers, two of them are writing to the same stream.
// Those four stream writers should be assigned to the same connection.
// 1. Submit three requests at first to trigger connection retry limitation.
// 2. At this point the connection should be entering a unrecoverable state.
// 3. Further submit requests to those stream writers would trigger connection reassignment.
StreamWriter writer1 = getMultiplexingStreamWriter(TEST_STREAM_1);
StreamWriter writer2 = getMultiplexingStreamWriter(TEST_STREAM_2);
StreamWriter writer3 = getMultiplexingStreamWriter(TEST_STREAM_3);
StreamWriter writer4 = getMultiplexingStreamWriter(TEST_STREAM_3);

testBigQueryWrite.setCloseForeverAfter(2);
testBigQueryWrite.setTimesToClose(1);
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

// Connection will be failed after triggering the third append.
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"}, 0);
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"B"}, 1);
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer3, new String[] {"C"}, 2);
TimeUnit.SECONDS.sleep(1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
assertThrows(
ExecutionException.class,
() -> {
assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue());
});
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1);
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1);

// Insert another request to the writer attached to closed connection would create another
// connection.

testBigQueryWrite.setCloseForeverAfter(0);
testBigQueryWrite.addResponse(createAppendResponse(4));
testBigQueryWrite.addResponse(createAppendResponse(5));
testBigQueryWrite.addResponse(createAppendResponse(6));
ApiFuture<AppendRowsResponse> appendFuture4 = sendTestMessage(writer4, new String[] {"A"}, 2);
ApiFuture<AppendRowsResponse> appendFuture5 = sendTestMessage(writer1, new String[] {"A"}, 3);
ApiFuture<AppendRowsResponse> appendFuture6 = sendTestMessage(writer2, new String[] {"B"}, 4);
assertEquals(4, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(5, appendFuture5.get().getAppendResult().getOffset().getValue());
assertEquals(6, appendFuture6.get().getAppendResult().getOffset().getValue());
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1);
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 2);

writer1.close();
writer2.close();
writer3.close();
writer4.close();
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 0);
}

@Test
public void testCloseWhileInUnrecoverableState() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(4).build());
ConnectionWorkerPool.enableTestingLogic();

// Setup: create three stream writers
// 1. Submit three requests at first to trigger connection retry limitation.
// 2. Submit request to writer3 to trigger reassignment
// 3. Close the previous two writers would be succesful
StreamWriter writer1 = getMultiplexingStreamWriter(TEST_STREAM_1);
StreamWriter writer2 = getMultiplexingStreamWriter(TEST_STREAM_2);
StreamWriter writer3 = getMultiplexingStreamWriter(TEST_STREAM_3);

testBigQueryWrite.setCloseForeverAfter(2);
testBigQueryWrite.setTimesToClose(1);
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

// Connection will be failed after triggering the third append.
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"}, 0);
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"B"}, 1);
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer3, new String[] {"C"}, 2);
TimeUnit.SECONDS.sleep(1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
assertThrows(
ExecutionException.class,
() -> {
assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue());
});
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1);
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1);

writer1.close();
writer2.close();
// We will still be left with one request
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1);
}

public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOException {
return StreamWriter.newBuilder(streamName, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.setMaxInflightRequests(10)
.setLocation("US")
.setMaxRetryDuration(java.time.Duration.ofMillis(100))
.build();
}

// Timeout to ensure close() doesn't wait for done callback timeout.
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
Expand Down

0 comments on commit 091dddb

Please sign in to comment.