From 8a81ad34824d7409c9b2c62594421105296b974e Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 14 Sep 2022 13:26:08 -0700 Subject: [PATCH] feat: add Load api for connection worker for multiplexing client --- .../bigquery/storage/v1/ConnectionWorker.java | 76 +++++++++++++++++++ .../storage/v1/ConnectionWorkerTest.java | 56 ++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 36bf7bbaa7..081ab0340b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -18,16 +18,19 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.util.Errors; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.util.Comparator; import java.util.Deque; import java.util.LinkedList; import java.util.UUID; @@ -672,4 +675,77 @@ private static final class AppendRequestAndResponse { this.messageSize = message.getProtoRows().getSerializedSize(); } } + + + /** + * Represent the current workload for this worker. Used for multiplexing algorithm to determine + * the distribution of requests. + */ + @AutoValue + public abstract static class Load { + // Consider the load on this worker to be overwhelmed when above some percentage of + // in-flight bytes or in-flight requests count. + private static double overwhelmedInflightCount = 0.5; + private static double overwhelmedInflightBytes = 0.6; + + // Number of in-flight requests bytes in the worker. + abstract long inFlightRequestsBytes(); + + // Number of in-flight requests count in the worker. + abstract long inFlightRequestsCount(); + + // Number of destination handled by this worker. + abstract long destinationCount(); + + // Max number of in-flight requests count allowed. + abstract long maxInflightBytes(); + + // Max number of in-flight requests bytes allowed. + abstract long maxInflightCount(); + + static Load create( + long inFlightRequestsBytes, + long inFlightRequestsCount, + long destinationCount, + long maxInflightBytes, + long maxInflightCount) { + return new AutoValue_ConnectionWorker_Load( + inFlightRequestsBytes, + inFlightRequestsCount, + destinationCount, + maxInflightBytes, + maxInflightCount); + } + + boolean isOverwhelmed() { + // Consider only in flight bytes and count for now, as by experiment those two are the most + // efficient and has great simplity. + return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount() + || inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes(); + } + + // Compares two different load. First compare in flight request bytes split by size 1024 bucket. + // Then compare the inflight requests count. + // Then compare destination count of the two connections. + public static final Comparator LOAD_COMPARATOR = + Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024)) + .thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100)) + .thenComparing(Load::destinationCount); + + // Compares two different load without bucket, used in smaller scale unit testing. + public static final Comparator TEST_LOAD_COMPARATOR = + Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes()) + .thenComparing((Load key) -> (int) key.inFlightRequestsCount()) + .thenComparing(Load::destinationCount); + + @VisibleForTesting + public static void setOverwhelmedBytesThreshold(double newThreshold) { + overwhelmedInflightBytes = newThreshold; + } + + @VisibleForTesting + public static void setOverwhelmedCountsThreshold(double newThreshold) { + overwhelmedInflightCount = newThreshold; + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java new file mode 100644 index 0000000000..35d8d5cf09 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ConnectionWorkerTest { + @Test + public void testLoadCompare_compareLoad() { + // In flight bytes bucket is split as per 1024 requests per bucket. + // When in flight bytes is in lower bucket, even destination count is higher and request count + // is higher, the load is still smaller. + Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10); + Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0); + + // In flight bytes in the same bucke of request bytes will compare request count. + Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10); + Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0); + + // In flight request and bytes in the same bucket will compare the destination count. + Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10); + Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue(); + } + + @Test + public void testLoadIsOverWhelmed() { + // Only in flight request is considered in current overwhelmed calculation. + Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100); + assertThat(load1.isOverwhelmed()).isTrue(); + + Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100); + assertThat(load2.isOverwhelmed()).isFalse(); + } +}