Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: some fixes for multiplexing client #1798

Merged
merged 23 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,24 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerPool(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerRegion(int)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerPool(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -153,24 +154,24 @@ public abstract static class Settings {
* The minimum connections each pool created before trying to reuse the previously created
* connection in multiplexing mode.
*/
abstract int minConnectionsPerPool();
abstract int minConnectionsPerRegion();

/** The maximum connections per connection pool. */
abstract int maxConnectionsPerPool();
abstract int maxConnectionsPerRegion();

public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(10);
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(10);
}

/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
// TODO(gaole) rename to per location for easier understanding.
public abstract Builder setMinConnectionsPerPool(int value);
public abstract Builder setMinConnectionsPerRegion(int value);

public abstract Builder setMaxConnectionsPerPool(int value);
public abstract Builder setMaxConnectionsPerRegion(int value);

public abstract Settings build();
}
Expand All @@ -192,7 +193,7 @@ public ConnectionWorkerPool(
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
this.currentMaxConnectionCount = settings.minConnectionsPerPool();
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}

/**
Expand Down Expand Up @@ -266,13 +267,13 @@ private ConnectionWorker createOrReuseConnectionWorker(
ImmutableList.copyOf(connectionWorkerPool));
if (!existingBestConnection.getLoad().isOverwhelmed()) {
return existingBestConnection;
} else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
} else if (currentMaxConnectionCount < settings.maxConnectionsPerRegion()) {
// At this point, we have reached the connection cap and the selected connection is
// overwhelmed, we can try scale up the connection pool.
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
currentMaxConnectionCount += 1;
if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
currentMaxConnectionCount = settings.maxConnectionsPerPool();
if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) {
currentMaxConnectionCount = settings.maxConnectionsPerRegion();
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {
Expand Down Expand Up @@ -323,6 +324,20 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
// TODO(gaole): figure out a better way to handle header / request body mismatch
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
BigQueryWriteClient clientAfterModification = client;
if (ownsBigQueryWriteClient) {
BigQueryWriteSettings settings = client.getSettings();
BigQueryWriteSettings stubSettings =
settings
.toBuilder()
.setHeaderProvider(
FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName))
.build();
clientAfterModification = BigQueryWriteClient.create(stubSettings);
}
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
Expand All @@ -331,7 +346,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
maxInflightBytes,
limitExceededBehavior,
traceId,
client,
clientAfterModification,
ownsBigQueryWriteClient);
connectionWorkerPool.add(connectionWorker);
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class JsonStreamWriter implements AutoCloseable {
private long totalMessageSize = 0;
private long absTotal = 0;
private ProtoSchema protoSchema;
private boolean enableConnectionPool = false;

/**
* Constructs the JsonStreamWriter
Expand All @@ -69,7 +71,6 @@ public class JsonStreamWriter implements AutoCloseable {
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

Expand All @@ -83,6 +84,8 @@ private JsonStreamWriter(Builder builder)
builder.endpoint,
builder.flowControlSettings,
builder.traceId);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -120,8 +123,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Update schema only work when connection pool is not enabled.
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
&& this.streamWriter.getUpdatedSchema() != null) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
Expand Down Expand Up @@ -312,6 +317,9 @@ public static final class Builder {
private String traceId;
private boolean ignoreUnknownFields = false;
private boolean reconnectAfter10M = false;
// Indicte whether multiplexing mode is enabled.
private boolean enableConnectionPool = false;
private String location;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -450,6 +458,31 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
return this;
}

/**
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
* connection if possible until the connection is overwhelmed. This feature is still under
* development, please contact write api team before using.
*
* @param enableConnectionPool
* @return Builder
*/
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}

/**
* Location of the table this stream writer is targeting. Connection pools are shared by
* location.
*
* @param location
* @return Builder
*/
public Builder setLocation(String location) {
this.location = location;
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
Expand All @@ -33,6 +32,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -180,7 +180,7 @@ private StreamWriter(Builder builder) throws IOException {
client = builder.client;
ownsBigQueryWriteClient = false;
}
if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
if (!builder.enableConnectionPool) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
Expand Down Expand Up @@ -212,22 +212,31 @@ private StreamWriter(Builder builder) throws IOException {
builder.traceId,
client,
ownsBigQueryWriteClient)));
validateFetchedConnectonPool(client, builder);
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
// TODO(gaole): instead of perform close outside of pool approach, change to always create
// new client in connection.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException unused) {
// Ignore interruption as this client is not used.
}
client.close();
}
}
}

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(
BigQueryWriteClient client, StreamWriter.Builder builder) {
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
client)) {
paramsValidatedFailed = "Bigquery write client";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
Expand Down Expand Up @@ -341,19 +350,6 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
/** Operation mode for the internal connection pool. */
public enum ConnectionMode {
// Create a connection per given write stream.
SINGLE_TABLE,
// Share a connection for multiple tables. This mode is only effective in default stream case.
// Some key characteristics:
// 1. tables within the same pool has to be in the same location.
// 2. Close(streamReference) will not close connection immediately until all tables on
// this connection is closed.
// 3. Try to use one stream per table at first and share stream later.
MULTIPLEXING
}

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;

private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb.
Expand All @@ -379,14 +375,14 @@ public enum ConnectionMode {
private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;

private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;

private String traceId = null;

private TableSchema updatedTableSchema = null;

private String location;

private boolean enableConnectionPool = false;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -419,8 +415,16 @@ public Builder setEndpoint(String endpoint) {
return this;
}

public Builder enableConnectionPool() {
this.connectionMode = ConnectionMode.MULTIPLEXING;
/**
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
* connection if possible until the connection is overwhelmed. This feature is still under
* development, please contact write api team before using.
*
* @param enableConnectionPool
* @return Builder
*/
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ private void testSendRequestsToMultiTable(
throws IOException, ExecutionException, InterruptedException {
ConnectionWorkerPool.setOptions(
Settings.builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(maxConnections)
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
Expand Down Expand Up @@ -201,7 +201,7 @@ private void testSendRequestsToMultiTable(
@Test
public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);

Expand Down Expand Up @@ -250,7 +250,7 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
@Test
public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.enableConnectionPool()
.setEnableConnectionPool(true)
.build();
}

Expand Down