diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index 69e67b9464..d8caaa07bf 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -40,4 +40,24 @@
com/google/cloud/bigquery/storage/v1/ConnectionWorker
com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)
+
+ 7002
+ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder
+ com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerPool(int)
+
+
+ 7013
+ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder
+ com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerRegion(int)
+
+
+ 7002
+ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder
+ com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerPool(int)
+
+
+ 7013
+ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder
+ com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int)
+
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
index 7b0d3a2964..bdddeca12d 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
@@ -17,6 +17,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.common.base.Stopwatch;
@@ -153,24 +154,24 @@ public abstract static class Settings {
* The minimum connections each pool created before trying to reuse the previously created
* connection in multiplexing mode.
*/
- abstract int minConnectionsPerPool();
+ abstract int minConnectionsPerRegion();
/** The maximum connections per connection pool. */
- abstract int maxConnectionsPerPool();
+ abstract int maxConnectionsPerRegion();
public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
- .setMinConnectionsPerPool(2)
- .setMaxConnectionsPerPool(10);
+ .setMinConnectionsPerRegion(2)
+ .setMaxConnectionsPerRegion(10);
}
/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
// TODO(gaole) rename to per location for easier understanding.
- public abstract Builder setMinConnectionsPerPool(int value);
+ public abstract Builder setMinConnectionsPerRegion(int value);
- public abstract Builder setMaxConnectionsPerPool(int value);
+ public abstract Builder setMaxConnectionsPerRegion(int value);
public abstract Settings build();
}
@@ -192,7 +193,7 @@ public ConnectionWorkerPool(
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
- this.currentMaxConnectionCount = settings.minConnectionsPerPool();
+ this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}
/**
@@ -266,13 +267,13 @@ private ConnectionWorker createOrReuseConnectionWorker(
ImmutableList.copyOf(connectionWorkerPool));
if (!existingBestConnection.getLoad().isOverwhelmed()) {
return existingBestConnection;
- } else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
+ } else if (currentMaxConnectionCount < settings.maxConnectionsPerRegion()) {
// At this point, we have reached the connection cap and the selected connection is
// overwhelmed, we can try scale up the connection pool.
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
currentMaxConnectionCount += 1;
- if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
- currentMaxConnectionCount = settings.maxConnectionsPerPool();
+ if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) {
+ currentMaxConnectionCount = settings.maxConnectionsPerRegion();
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {
@@ -323,6 +324,20 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
+ // TODO(gaole): figure out a better way to handle header / request body mismatch
+ // currently we use different header for the client in each connection worker to be different
+ // as the backend require the header to have the same write_stream field as request body.
+ BigQueryWriteClient clientAfterModification = client;
+ if (ownsBigQueryWriteClient) {
+ BigQueryWriteSettings settings = client.getSettings();
+ BigQueryWriteSettings stubSettings =
+ settings
+ .toBuilder()
+ .setHeaderProvider(
+ FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName))
+ .build();
+ clientAfterModification = BigQueryWriteClient.create(stubSettings);
+ }
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
@@ -331,7 +346,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
maxInflightBytes,
limitExceededBehavior,
traceId,
- client,
+ clientAfterModification,
ownsBigQueryWriteClient);
connectionWorkerPool.add(connectionWorker);
log.info(
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
index 6ef655781b..d3abd647e4 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
@@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
+import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
@@ -60,6 +61,7 @@ public class JsonStreamWriter implements AutoCloseable {
private long totalMessageSize = 0;
private long absTotal = 0;
private ProtoSchema protoSchema;
+ private boolean enableConnectionPool = false;
/**
* Constructs the JsonStreamWriter
@@ -69,7 +71,6 @@ public class JsonStreamWriter implements AutoCloseable {
private JsonStreamWriter(Builder builder)
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
InterruptedException {
- this.client = builder.client;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
@@ -83,6 +84,8 @@ private JsonStreamWriter(Builder builder)
builder.endpoint,
builder.flowControlSettings,
builder.traceId);
+ streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
+ streamWriterBuilder.setLocation(builder.location);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
@@ -120,8 +123,10 @@ public ApiFuture append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
- TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
- if (updatedSchema != null) {
+ // Update schema only work when connection pool is not enabled.
+ if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
+ && this.streamWriter.getUpdatedSchema() != null) {
+ TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
@@ -312,6 +317,9 @@ public static final class Builder {
private String traceId;
private boolean ignoreUnknownFields = false;
private boolean reconnectAfter10M = false;
+ // Indicte whether multiplexing mode is enabled.
+ private boolean enableConnectionPool = false;
+ private String location;
private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
@@ -450,6 +458,31 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
return this;
}
+ /**
+ * Enable multiplexing for this writer. In multiplexing mode tables will share the same
+ * connection if possible until the connection is overwhelmed. This feature is still under
+ * development, please contact write api team before using.
+ *
+ * @param enableConnectionPool
+ * @return Builder
+ */
+ public Builder setEnableConnectionPool(boolean enableConnectionPool) {
+ this.enableConnectionPool = enableConnectionPool;
+ return this;
+ }
+
+ /**
+ * Location of the table this stream writer is targeting. Connection pools are shared by
+ * location.
+ *
+ * @param location
+ * @return Builder
+ */
+ public Builder setLocation(String location) {
+ this.location = location;
+ return this;
+ }
+
/**
* Builds JsonStreamWriter
*
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index 180ee81d94..be6c10dff8 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -22,7 +22,6 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
-import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
@@ -33,6 +32,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
@@ -180,7 +180,7 @@ private StreamWriter(Builder builder) throws IOException {
client = builder.client;
ownsBigQueryWriteClient = false;
}
- if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
+ if (!builder.enableConnectionPool) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
@@ -212,22 +212,31 @@ private StreamWriter(Builder builder) throws IOException {
builder.traceId,
client,
ownsBigQueryWriteClient)));
- validateFetchedConnectonPool(client, builder);
+ validateFetchedConnectonPool(builder);
+ // Shut down the passed in client. Internally we will create another client inside connection
+ // pool for every new connection worker.
+ // TODO(gaole): instead of perform close outside of pool approach, change to always create
+ // new client in connection.
+ if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
+ && ownsBigQueryWriteClient) {
+ client.shutdown();
+ try {
+ client.awaitTermination(150, TimeUnit.SECONDS);
+ } catch (InterruptedException unused) {
+ // Ignore interruption as this client is not used.
+ }
+ client.close();
+ }
}
}
// Validate whether the fetched connection pool matched certain properties.
- private void validateFetchedConnectonPool(
- BigQueryWriteClient client, StreamWriter.Builder builder) {
+ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
- } else if (!Objects.equals(
- this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
- client)) {
- paramsValidatedFailed = "Bigquery write client";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
@@ -341,19 +350,6 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {
/** A builder of {@link StreamWriter}s. */
public static final class Builder {
- /** Operation mode for the internal connection pool. */
- public enum ConnectionMode {
- // Create a connection per given write stream.
- SINGLE_TABLE,
- // Share a connection for multiple tables. This mode is only effective in default stream case.
- // Some key characteristics:
- // 1. tables within the same pool has to be in the same location.
- // 2. Close(streamReference) will not close connection immediately until all tables on
- // this connection is closed.
- // 3. Try to use one stream per table at first and share stream later.
- MULTIPLEXING
- }
-
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb.
@@ -379,14 +375,14 @@ public enum ConnectionMode {
private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;
- private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;
-
private String traceId = null;
private TableSchema updatedTableSchema = null;
private String location;
+ private boolean enableConnectionPool = false;
+
private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
@@ -419,8 +415,16 @@ public Builder setEndpoint(String endpoint) {
return this;
}
- public Builder enableConnectionPool() {
- this.connectionMode = ConnectionMode.MULTIPLEXING;
+ /**
+ * Enable multiplexing for this writer. In multiplexing mode tables will share the same
+ * connection if possible until the connection is overwhelmed. This feature is still under
+ * development, please contact write api team before using.
+ *
+ * @param enableConnectionPool
+ * @return Builder
+ */
+ public Builder setEnableConnectionPool(boolean enableConnectionPool) {
+ this.enableConnectionPool = enableConnectionPool;
return this;
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
index 8b865eb13a..fa551c0a6b 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
@@ -146,8 +146,8 @@ private void testSendRequestsToMultiTable(
throws IOException, ExecutionException, InterruptedException {
ConnectionWorkerPool.setOptions(
Settings.builder()
- .setMinConnectionsPerPool(2)
- .setMaxConnectionsPerPool(maxConnections)
+ .setMinConnectionsPerRegion(2)
+ .setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
@@ -201,7 +201,7 @@ private void testSendRequestsToMultiTable(
@Test
public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
- Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
+ Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);
@@ -250,7 +250,7 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
@Test
public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
- Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
+ Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 04725ba97b..8ceeff4daf 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -96,7 +96,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
- .enableConnectionPool()
+ .setEnableConnectionPool(true)
.build();
}