Skip to content

Commit

Permalink
feat: some fixes for multiplexing client (#1798)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Sep 23, 2022
1 parent f6083be commit b3ffd77
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 45 deletions.
20 changes: 20 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,24 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerPool(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerRegion(int)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerPool(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -153,24 +154,24 @@ public abstract static class Settings {
* The minimum connections each pool created before trying to reuse the previously created
* connection in multiplexing mode.
*/
abstract int minConnectionsPerPool();
abstract int minConnectionsPerRegion();

/** The maximum connections per connection pool. */
abstract int maxConnectionsPerPool();
abstract int maxConnectionsPerRegion();

public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(10);
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(10);
}

/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
// TODO(gaole) rename to per location for easier understanding.
public abstract Builder setMinConnectionsPerPool(int value);
public abstract Builder setMinConnectionsPerRegion(int value);

public abstract Builder setMaxConnectionsPerPool(int value);
public abstract Builder setMaxConnectionsPerRegion(int value);

public abstract Settings build();
}
Expand All @@ -192,7 +193,7 @@ public ConnectionWorkerPool(
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
this.currentMaxConnectionCount = settings.minConnectionsPerPool();
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}

/**
Expand Down Expand Up @@ -266,13 +267,13 @@ private ConnectionWorker createOrReuseConnectionWorker(
ImmutableList.copyOf(connectionWorkerPool));
if (!existingBestConnection.getLoad().isOverwhelmed()) {
return existingBestConnection;
} else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
} else if (currentMaxConnectionCount < settings.maxConnectionsPerRegion()) {
// At this point, we have reached the connection cap and the selected connection is
// overwhelmed, we can try scale up the connection pool.
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
currentMaxConnectionCount += 1;
if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
currentMaxConnectionCount = settings.maxConnectionsPerPool();
if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) {
currentMaxConnectionCount = settings.maxConnectionsPerRegion();
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {
Expand Down Expand Up @@ -323,6 +324,20 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
// TODO(gaole): figure out a better way to handle header / request body mismatch
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
BigQueryWriteClient clientAfterModification = client;
if (ownsBigQueryWriteClient) {
BigQueryWriteSettings settings = client.getSettings();
BigQueryWriteSettings stubSettings =
settings
.toBuilder()
.setHeaderProvider(
FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName))
.build();
clientAfterModification = BigQueryWriteClient.create(stubSettings);
}
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
Expand All @@ -331,7 +346,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
maxInflightBytes,
limitExceededBehavior,
traceId,
client,
clientAfterModification,
ownsBigQueryWriteClient);
connectionWorkerPool.add(connectionWorker);
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class JsonStreamWriter implements AutoCloseable {
private long totalMessageSize = 0;
private long absTotal = 0;
private ProtoSchema protoSchema;
private boolean enableConnectionPool = false;

/**
* Constructs the JsonStreamWriter
Expand All @@ -69,7 +71,6 @@ public class JsonStreamWriter implements AutoCloseable {
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

Expand All @@ -83,6 +84,8 @@ private JsonStreamWriter(Builder builder)
builder.endpoint,
builder.flowControlSettings,
builder.traceId);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -120,8 +123,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Update schema only work when connection pool is not enabled.
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
&& this.streamWriter.getUpdatedSchema() != null) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
Expand Down Expand Up @@ -312,6 +317,9 @@ public static final class Builder {
private String traceId;
private boolean ignoreUnknownFields = false;
private boolean reconnectAfter10M = false;
// Indicte whether multiplexing mode is enabled.
private boolean enableConnectionPool = false;
private String location;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -450,6 +458,31 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
return this;
}

/**
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
* connection if possible until the connection is overwhelmed. This feature is still under
* development, please contact write api team before using.
*
* @param enableConnectionPool
* @return Builder
*/
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}

/**
* Location of the table this stream writer is targeting. Connection pools are shared by
* location.
*
* @param location
* @return Builder
*/
public Builder setLocation(String location) {
this.location = location;
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
Expand All @@ -33,6 +32,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -180,7 +180,7 @@ private StreamWriter(Builder builder) throws IOException {
client = builder.client;
ownsBigQueryWriteClient = false;
}
if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
if (!builder.enableConnectionPool) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
Expand Down Expand Up @@ -212,22 +212,31 @@ private StreamWriter(Builder builder) throws IOException {
builder.traceId,
client,
ownsBigQueryWriteClient)));
validateFetchedConnectonPool(client, builder);
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
// TODO(gaole): instead of perform close outside of pool approach, change to always create
// new client in connection.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException unused) {
// Ignore interruption as this client is not used.
}
client.close();
}
}
}

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(
BigQueryWriteClient client, StreamWriter.Builder builder) {
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
client)) {
paramsValidatedFailed = "Bigquery write client";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
Expand Down Expand Up @@ -341,19 +350,6 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
/** Operation mode for the internal connection pool. */
public enum ConnectionMode {
// Create a connection per given write stream.
SINGLE_TABLE,
// Share a connection for multiple tables. This mode is only effective in default stream case.
// Some key characteristics:
// 1. tables within the same pool has to be in the same location.
// 2. Close(streamReference) will not close connection immediately until all tables on
// this connection is closed.
// 3. Try to use one stream per table at first and share stream later.
MULTIPLEXING
}

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;

private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb.
Expand All @@ -379,14 +375,14 @@ public enum ConnectionMode {
private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;

private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;

private String traceId = null;

private TableSchema updatedTableSchema = null;

private String location;

private boolean enableConnectionPool = false;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -419,8 +415,16 @@ public Builder setEndpoint(String endpoint) {
return this;
}

public Builder enableConnectionPool() {
this.connectionMode = ConnectionMode.MULTIPLEXING;
/**
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
* connection if possible until the connection is overwhelmed. This feature is still under
* development, please contact write api team before using.
*
* @param enableConnectionPool
* @return Builder
*/
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ private void testSendRequestsToMultiTable(
throws IOException, ExecutionException, InterruptedException {
ConnectionWorkerPool.setOptions(
Settings.builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(maxConnections)
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
Expand Down Expand Up @@ -201,7 +201,7 @@ private void testSendRequestsToMultiTable(
@Test
public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);

Expand Down Expand Up @@ -250,7 +250,7 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
@Test
public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.enableConnectionPool()
.setEnableConnectionPool(true)
.build();
}

Expand Down

0 comments on commit b3ffd77

Please sign in to comment.