From 5a27e3ead23c1449f7a909132ef57ecbf3e6365e Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Fri, 26 May 2023 12:52:42 -0700 Subject: [PATCH] services, xds, orca: support EPS in client-side WRR (#10177) --- .../main/java/io/grpc/internal/JsonUtil.java | 27 +++ .../java/io/grpc/internal/JsonUtilTest.java | 28 +++ repositories.bzl | 6 +- .../io/grpc/services/CallMetricRecorder.java | 24 ++- .../services/InternalCallMetricRecorder.java | 5 +- .../java/io/grpc/services/MetricRecorder.java | 22 +- .../grpc/services/MetricRecorderHelper.java | 6 +- .../java/io/grpc/services/MetricReport.java | 9 +- .../grpc/services/CallMetricRecorderTest.java | 10 + .../grpc/xds/LoadBalancerConfigFactory.java | 11 +- .../xds/WeightedRoundRobinLoadBalancer.java | 81 ++++++-- ...eightedRoundRobinLoadBalancerProvider.java | 4 + .../OrcaMetricReportingServerInterceptor.java | 5 + .../io/grpc/xds/orca/OrcaPerRequestUtil.java | 2 +- .../io/grpc/xds/orca/OrcaServiceImpl.java | 1 + .../xds/LoadBalancerConfigFactoryTest.java | 6 +- ...tedRoundRobinLoadBalancerProviderTest.java | 5 +- .../WeightedRoundRobinLoadBalancerTest.java | 191 +++++++++++++----- .../io/grpc/xds/XdsClientImplDataTest.java | 4 + ...aMetricReportingServerInterceptorTest.java | 9 + .../grpc/xds/orca/OrcaPerRequestUtilTest.java | 2 + .../io/grpc/xds/orca/OrcaServiceImplTest.java | 8 +- 22 files changed, 377 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/JsonUtil.java b/core/src/main/java/io/grpc/internal/JsonUtil.java index 65f7cf5649e..44cb22abda5 100644 --- a/core/src/main/java/io/grpc/internal/JsonUtil.java +++ b/core/src/main/java/io/grpc/internal/JsonUtil.java @@ -121,6 +121,33 @@ public static Double getNumberAsDouble(Map obj, String key) { String.format("value '%s' for key '%s' in '%s' is not a number", value, key, obj)); } + /** + * Gets a number from an object for the given key. If the key is not present, this returns null. + * If the value does not represent a float, throws an exception. + */ + @Nullable + public static Float getNumberAsFloat(Map obj, String key) { + assert key != null; + if (!obj.containsKey(key)) { + return null; + } + Object value = obj.get(key); + if (value instanceof Float) { + return (Float) value; + } + if (value instanceof String) { + try { + return Float.parseFloat((String) value); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("string value '%s' for key '%s' cannot be parsed as a float", value, + key)); + } + } + throw new IllegalArgumentException( + String.format("value %s for key '%s' is not a float", value, key)); + } + /** * Gets a number from an object for the given key, casted to an integer. If the key is not * present, this returns null. If the value does not represent an integer, throws an exception. diff --git a/core/src/test/java/io/grpc/internal/JsonUtilTest.java b/core/src/test/java/io/grpc/internal/JsonUtilTest.java index a01f8868220..058171814ea 100644 --- a/core/src/test/java/io/grpc/internal/JsonUtilTest.java +++ b/core/src/test/java/io/grpc/internal/JsonUtilTest.java @@ -38,16 +38,19 @@ public void getNumber() { map.put("key_string_nan", "NaN"); map.put("key_number_5.5", 5.5D); map.put("key_string_six", "six"); + map.put("key_number_7", 7F); map.put("key_string_infinity", "Infinity"); map.put("key_string_minus_infinity", "-Infinity"); map.put("key_string_exponent", "2.998e8"); map.put("key_string_minus_zero", "-0"); + map.put("key_string_boolean", true); assertThat(JsonUtil.getNumberAsDouble(map, "key_number_1")).isEqualTo(1D); assertThat(JsonUtil.getNumberAsInteger(map, "key_number_1")).isEqualTo(1); assertThat(JsonUtil.getNumberAsLong(map, "key_number_1")).isEqualTo(1L); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_2.0")).isEqualTo(2D); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_2.0")).isEqualTo(2F); try { JsonUtil.getNumberAsInteger(map, "key_string_2.0"); fail("expecting to throw but did not"); @@ -66,8 +69,10 @@ public void getNumber() { assertThat(JsonUtil.getNumberAsDouble(map, "key_string_3")).isEqualTo(3D); assertThat(JsonUtil.getNumberAsInteger(map, "key_string_3")).isEqualTo(3); assertThat(JsonUtil.getNumberAsLong(map, "key_string_3")).isEqualTo(3L); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_3")).isEqualTo(3F); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_nan")).isNaN(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_nan")).isNaN(); try { JsonUtil.getNumberAsInteger(map, "key_string_nan"); fail("expecting to throw but did not"); @@ -118,18 +123,41 @@ public void getNumber() { assertThat(e).hasMessageThat().isEqualTo( "value 'six' for key 'key_string_six' is not a long integer"); } + try { + JsonUtil.getNumberAsFloat(map, "key_string_six"); + fail("expecting to throw but did not"); + } catch (RuntimeException e) { + assertThat(e).hasMessageThat().isEqualTo( + "string value 'six' for key 'key_string_six' cannot be parsed as a float"); + } + + assertThat(JsonUtil.getNumberAsFloat(map, "key_number_7")).isEqualTo(7F); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_infinity")).isPositiveInfinity(); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_minus_infinity")).isNegativeInfinity(); assertThat(JsonUtil.getNumberAsDouble(map, "key_string_exponent")).isEqualTo(2.998e8D); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_infinity")).isPositiveInfinity(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_minus_infinity")).isNegativeInfinity(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_exponent")).isEqualTo(2.998e8F); + assertThat(JsonUtil.getNumberAsDouble(map, "key_string_minus_zero")).isZero(); assertThat(JsonUtil.getNumberAsInteger(map, "key_string_minus_zero")).isEqualTo(0); assertThat(JsonUtil.getNumberAsLong(map, "key_string_minus_zero")).isEqualTo(0L); + assertThat(JsonUtil.getNumberAsFloat(map, "key_string_minus_zero")).isZero(); assertThat(JsonUtil.getNumberAsDouble(map, "key_nonexistent")).isNull(); assertThat(JsonUtil.getNumberAsInteger(map, "key_nonexistent")).isNull(); assertThat(JsonUtil.getNumberAsLong(map, "key_nonexistent")).isNull(); + assertThat(JsonUtil.getNumberAsFloat(map, "key_nonexistent")).isNull(); + + try { + JsonUtil.getNumberAsFloat(map, "key_string_boolean"); + fail("expecting to throw but did not"); + } catch (RuntimeException e) { + assertThat(e).hasMessageThat().isEqualTo( + "value true for key 'key_string_boolean' is not a float"); + } } @Test diff --git a/repositories.bzl b/repositories.bzl index 838081f87c9..884f9041cfc 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -86,10 +86,10 @@ def grpc_java_repositories(): if not native.existing_rule("com_github_cncf_xds"): http_archive( name = "com_github_cncf_xds", - strip_prefix = "xds-06c439db220b89134a8a49bad41994560d6537c6", - sha256 = "41ea212940ab44bf7f8a8b4169cfbc612ed2166dafabc0a56a8820ef665fc6a4", + strip_prefix = "xds-32f1caf87195bf3390061c29f18987e51ca56a88", + sha256 = "fcd0b50c013452fda9c5e28c131c287b655ebb361271a76ad3bffc08b3ecd82e", urls = [ - "https://github.com/cncf/xds/archive/06c439db220b89134a8a49bad41994560d6537c6.tar.gz", + "https://github.com/cncf/xds/archive/32f1caf87195bf3390061c29f18987e51ca56a88.tar.gz", ], ) if not native.existing_rule("com_github_grpc_grpc"): diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index 8570a989f26..de8aa48d300 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -44,6 +44,7 @@ public final class CallMetricRecorder { private double cpuUtilizationMetric = 0; private double memoryUtilizationMetric = 0; private double qps = 0; + private double eps = 0; private volatile boolean disabled; /** @@ -160,8 +161,8 @@ public CallMetricRecorder recordMemoryUtilizationMetric(double value) { } /** - * Records a call metric measurement for qps in the range [0, inf). Values outside the valid range - * are ignored. If RPC has already finished, this method is no-op. + * Records a call metric measurement for queries per second (qps) in the range [0, inf). Values + * outside the valid range are ignored. If RPC has already finished, this method is no-op. * *

A latter record will overwrite its former name-sakes. * @@ -169,13 +170,28 @@ public CallMetricRecorder recordMemoryUtilizationMetric(double value) { * @since 1.54.0 */ public CallMetricRecorder recordQpsMetric(double value) { - if (disabled || !MetricRecorderHelper.isQpsValid(value)) { + if (disabled || !MetricRecorderHelper.isRateValid(value)) { return this; } qps = value; return this; } + /** + * Records a call metric measurement for errors per second (eps) in the range [0, inf). Values + * outside the valid range are ignored. If RPC has already finished, this method is no-op. + * + *

A latter record will overwrite its former name-sakes. + * + * @return this recorder object + */ + public CallMetricRecorder recordEpsMetric(double value) { + if (disabled || !MetricRecorderHelper.isRateValid(value)) { + return this; + } + eps = value; + return this; + } /** * Returns all request cost metric values. No more metric values will be recorded after this @@ -205,7 +221,7 @@ MetricReport finalizeAndDump2() { if (savedUtilizationMetrics == null) { savedUtilizationMetrics = Collections.emptyMap(); } - return new MetricReport(cpuUtilizationMetric, memoryUtilizationMetric, qps, + return new MetricReport(cpuUtilizationMetric, memoryUtilizationMetric, qps, eps, Collections.unmodifiableMap(savedRequestCostMetrics), Collections.unmodifiableMap(savedUtilizationMetrics) ); diff --git a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java index 6cee9048c4c..5865357aabf 100644 --- a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java @@ -46,8 +46,9 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) { } public static MetricReport createMetricReport(double cpuUtilization, double memoryUtilization, - double qps, Map requestCostMetrics, Map utilizationMetrics) { - return new MetricReport(cpuUtilization, memoryUtilization, qps, requestCostMetrics, + double qps, double eps, Map requestCostMetrics, + Map utilizationMetrics) { + return new MetricReport(cpuUtilization, memoryUtilization, qps, eps, requestCostMetrics, utilizationMetrics); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorder.java b/services/src/main/java/io/grpc/services/MetricRecorder.java index 3027d495460..248cbb13e56 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorder.java +++ b/services/src/main/java/io/grpc/services/MetricRecorder.java @@ -31,6 +31,7 @@ public final class MetricRecorder { private volatile double cpuUtilization; private volatile double memoryUtilization; private volatile double qps; + private volatile double eps; public static MetricRecorder newInstance() { return new MetricRecorder(); @@ -103,7 +104,7 @@ public void clearMemoryUtilizationMetric() { * Update the QPS metrics data in the range [0, inf). Values outside the valid range are ignored. */ public void setQpsMetric(double value) { - if (!MetricRecorderHelper.isQpsValid(value)) { + if (!MetricRecorderHelper.isRateValid(value)) { return; } qps = value; @@ -116,8 +117,25 @@ public void clearQpsMetric() { qps = 0; } + /** + * Update the EPS metrics data in the range [0, inf). Values outside the valid range are ignored. + */ + public void setEpsMetric(double value) { + if (!MetricRecorderHelper.isRateValid(value)) { + return; + } + this.eps = value; + } + + /** + * Clear the EPS metrics data. + */ + public void clearEpsMetric() { + eps = 0; + } + MetricReport getMetricReport() { - return new MetricReport(cpuUtilization, memoryUtilization, qps, + return new MetricReport(cpuUtilization, memoryUtilization, qps, eps, Collections.emptyMap(), Collections.unmodifiableMap(metricsData)); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorderHelper.java b/services/src/main/java/io/grpc/services/MetricRecorderHelper.java index 94a811f4f10..66439ac6044 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorderHelper.java +++ b/services/src/main/java/io/grpc/services/MetricRecorderHelper.java @@ -39,10 +39,10 @@ static boolean isCpuUtilizationValid(double utilization) { } /** - * Return true if the qps value is in the range [0, inf) and false otherwise. + * Return true if a rate value (such as qps or eps) is in the range [0, inf) and false otherwise. */ - static boolean isQpsValid(double qps) { - return qps >= 0.0; + static boolean isRateValid(double rate) { + return rate >= 0.0; } // Prevent instantiation. diff --git a/services/src/main/java/io/grpc/services/MetricReport.java b/services/src/main/java/io/grpc/services/MetricReport.java index 73aba7a2af9..0ab8a386c10 100644 --- a/services/src/main/java/io/grpc/services/MetricReport.java +++ b/services/src/main/java/io/grpc/services/MetricReport.java @@ -31,15 +31,17 @@ public final class MetricReport { private double cpuUtilization; private double memoryUtilization; private double qps; + private double eps; private Map requestCostMetrics; private Map utilizationMetrics; - MetricReport(double cpuUtilization, double memoryUtilization, double qps, + MetricReport(double cpuUtilization, double memoryUtilization, double qps, double eps, Map requestCostMetrics, Map utilizationMetrics) { this.cpuUtilization = cpuUtilization; this.memoryUtilization = memoryUtilization; this.qps = qps; + this.eps = eps; this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics"); this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics"); } @@ -64,6 +66,10 @@ public double getQps() { return qps; } + public double getEps() { + return eps; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -72,6 +78,7 @@ public String toString() { .add("requestCost", requestCostMetrics) .add("utilization", utilizationMetrics) .add("qps", qps) + .add("eps", eps) .toString(); } } diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index 03f29a05ef5..439cd7628a3 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -47,6 +47,7 @@ public void dumpDumpsAllSavedMetricValues() { recorder.recordCpuUtilizationMetric(0.1928); recorder.recordMemoryUtilizationMetric(0.474); recorder.recordQpsMetric(2522.54); + recorder.recordEpsMetric(1.618); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getUtilizationMetrics()) @@ -56,12 +57,16 @@ public void dumpDumpsAllSavedMetricValues() { Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474); Truth.assertThat(dump.getQps()).isEqualTo(2522.54); + Truth.assertThat(dump.getEps()).isEqualTo(1.618); + Truth.assertThat(dump.toString()).contains("eps=1.618"); } @Test public void noMetricsRecordedAfterSnapshot() { Map initDump = recorder.finalizeAndDump(); recorder.recordUtilizationMetric("cost", 0.154353423); + recorder.recordQpsMetric(3.14159); + recorder.recordEpsMetric(1.618); assertThat(recorder.finalizeAndDump()).isEqualTo(initDump); } @@ -84,12 +89,14 @@ public void noMetricsRecordedIfUtilizationAndQpsAreLessThanLowerBound() { recorder.recordCpuUtilizationMetric(-0.001); recorder.recordMemoryUtilizationMetric(-0.001); recorder.recordQpsMetric(-0.001); + recorder.recordEpsMetric(-0.001); recorder.recordUtilizationMetric("util1", -0.001); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0); Truth.assertThat(dump.getQps()).isEqualTo(0); + Truth.assertThat(dump.getEps()).isEqualTo(0); Truth.assertThat(dump.getUtilizationMetrics()).isEmpty(); Truth.assertThat(dump.getRequestCostMetrics()).isEmpty(); } @@ -108,6 +115,8 @@ public void lastValueWinForMetricsWithSameName() { recorder.recordUtilizationMetric("util1", 0.843233); recorder.recordQpsMetric(1928.3); recorder.recordQpsMetric(100.8); + recorder.recordEpsMetric(3.14159); + recorder.recordEpsMetric(1.618); MetricReport dump = recorder.finalizeAndDump2(); Truth.assertThat(dump.getRequestCostMetrics()) @@ -117,6 +126,7 @@ public void lastValueWinForMetricsWithSameName() { .containsExactly("util1", 0.843233); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); Truth.assertThat(dump.getQps()).isEqualTo(100.8); + Truth.assertThat(dump.getEps()).isEqualTo(1.618); } @Test diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java index 228b2442eb8..9b1dc722400 100644 --- a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java +++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java @@ -89,6 +89,8 @@ class LoadBalancerConfigFactory { static final String PICK_FIRST_FIELD_NAME = "pick_first"; static final String SHUFFLE_ADDRESS_LIST_FIELD_NAME = "shuffleAddressList"; + static final String ERROR_UTILIZATION_PENALTY = "errorUtilizationPenalty"; + /** * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link * Cluster}. @@ -135,7 +137,8 @@ class LoadBalancerConfigFactory { String weightExpirationPeriod, String oobReportingPeriod, Boolean enableOobLoadReport, - String weightUpdatePeriod) { + String weightUpdatePeriod, + Float errorUtilizationPenalty) { ImmutableMap.Builder configBuilder = ImmutableMap.builder(); if (blackoutPeriod != null) { configBuilder.put(BLACK_OUT_PERIOD, blackoutPeriod); @@ -152,6 +155,9 @@ class LoadBalancerConfigFactory { if (weightUpdatePeriod != null) { configBuilder.put(WEIGHT_UPDATE_PERIOD, weightUpdatePeriod); } + if (errorUtilizationPenalty != null) { + configBuilder.put(ERROR_UTILIZATION_PENALTY, errorUtilizationPenalty); + } return ImmutableMap.of(WeightedRoundRobinLoadBalancerProvider.SCHEME, configBuilder.buildOrThrow()); } @@ -291,7 +297,8 @@ static class LoadBalancingPolicyConverter { ? Durations.toString(wrr.getWeightExpirationPeriod()) : null, wrr.hasOobReportingPeriod() ? Durations.toString(wrr.getOobReportingPeriod()) : null, wrr.hasEnableOobLoadReport() ? wrr.getEnableOobLoadReport().getValue() : null, - wrr.hasWeightUpdatePeriod() ? Durations.toString(wrr.getWeightUpdatePeriod()) : null); + wrr.hasWeightUpdatePeriod() ? Durations.toString(wrr.getWeightUpdatePeriod()) : null, + wrr.hasErrorUtilizationPenalty() ? wrr.getErrorUtilizationPenalty().getValue() : null); } catch (IllegalArgumentException ex) { throw new ResourceInvalidException("Invalid duration in weighted round robin config: " + ex.getMessage()); diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index 87593d53241..cf54eb4bb17 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -40,8 +40,10 @@ import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener; import io.grpc.xds.orca.OrcaPerRequestUtil; import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.PriorityQueue; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; @@ -110,7 +112,8 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { @Override public RoundRobinPicker createReadyPicker(List activeList) { - return new WeightedRoundRobinPicker(activeList, config.enableOobLoadReport); + return new WeightedRoundRobinPicker(activeList, config.enableOobLoadReport, + config.errorUtilizationPenalty); } private final class UpdateWeightTask implements Runnable { @@ -128,7 +131,8 @@ private void afterAcceptAddresses() { for (Subchannel subchannel : getSubchannels()) { WrrSubchannel weightedSubchannel = (WrrSubchannel) subchannel; if (config.enableOobLoadReport) { - OrcaOobUtil.setListener(weightedSubchannel, weightedSubchannel.oobListener, + OrcaOobUtil.setListener(weightedSubchannel, + weightedSubchannel.new OrcaReportListener(config.errorUtilizationPenalty), OrcaOobUtil.OrcaReportingConfig.newBuilder() .setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS) .build()); @@ -172,8 +176,6 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { @VisibleForTesting final class WrrSubchannel extends ForwardingSubchannel { private final Subchannel delegate; - private final OrcaOobReportListener oobListener = this::onLoadReport; - private final OrcaPerRequestReportListener perRpcListener = this::onLoadReport; private volatile long lastUpdated; private volatile long nonEmptySince; private volatile double weight; @@ -182,20 +184,6 @@ final class WrrSubchannel extends ForwardingSubchannel { this.delegate = checkNotNull(delegate, "delegate"); } - @VisibleForTesting - void onLoadReport(MetricReport report) { - double newWeight = report.getCpuUtilization() == 0 ? 0 : - report.getQps() / report.getCpuUtilization(); - if (newWeight == 0) { - return; - } - if (nonEmptySince == infTime) { - nonEmptySince = ticker.nanoTime(); - } - lastUpdated = ticker.nanoTime(); - weight = newWeight; - } - @Override public void start(SubchannelStateListener listener) { delegate().start(new SubchannelStateListener() { @@ -229,19 +217,56 @@ private double getWeight() { protected Subchannel delegate() { return delegate; } + + final class OrcaReportListener implements OrcaPerRequestReportListener, OrcaOobReportListener { + private final float errorUtilizationPenalty; + + OrcaReportListener(float errorUtilizationPenalty) { + this.errorUtilizationPenalty = errorUtilizationPenalty; + } + + @Override + public void onLoadReport(MetricReport report) { + double newWeight = 0; + if (report.getCpuUtilization() > 0 && report.getQps() > 0) { + double penalty = 0; + if (report.getEps() > 0 && errorUtilizationPenalty > 0) { + penalty = report.getEps() / report.getQps() * errorUtilizationPenalty; + } + newWeight = report.getQps() / (report.getCpuUtilization() + penalty); + } + if (newWeight == 0) { + return; + } + if (nonEmptySince == infTime) { + nonEmptySince = ticker.nanoTime(); + } + lastUpdated = ticker.nanoTime(); + weight = newWeight; + } + } } @VisibleForTesting final class WeightedRoundRobinPicker extends RoundRobinPicker { private final List list; + private final Map subchannelToReportListenerMap = + new HashMap<>(); private final boolean enableOobLoadReport; + private final float errorUtilizationPenalty; private volatile EdfScheduler scheduler; - WeightedRoundRobinPicker(List list, boolean enableOobLoadReport) { + WeightedRoundRobinPicker(List list, boolean enableOobLoadReport, + float errorUtilizationPenalty) { checkNotNull(list, "list"); Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; + for (Subchannel subchannel : list) { + this.subchannelToReportListenerMap.put(subchannel, + ((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty)); + } this.enableOobLoadReport = enableOobLoadReport; + this.errorUtilizationPenalty = errorUtilizationPenalty; updateWeight(); } @@ -251,7 +276,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (!enableOobLoadReport) { return PickResult.withSubchannel(subchannel, OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory( - ((WrrSubchannel)subchannel).perRpcListener)); + subchannelToReportListenerMap.getOrDefault(subchannel, + ((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty)))); } else { return PickResult.withSubchannel(subchannel); } @@ -285,6 +311,7 @@ private void updateWeight() { public String toString() { return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class) .add("enableOobLoadReport", enableOobLoadReport) + .add("errorUtilizationPenalty", errorUtilizationPenalty) .add("list", list).toString(); } @@ -304,6 +331,7 @@ public boolean isEquivalentTo(RoundRobinPicker picker) { } // the lists cannot contain duplicate subchannels return enableOobLoadReport == other.enableOobLoadReport + && Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0 && list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list); } } @@ -419,6 +447,7 @@ static final class WeightedRoundRobinLoadBalancerConfig { final boolean enableOobLoadReport; final long oobReportingPeriodNanos; final long weightUpdatePeriodNanos; + final float errorUtilizationPenalty; public static Builder newBuilder() { return new Builder(); @@ -428,12 +457,14 @@ private WeightedRoundRobinLoadBalancerConfig(long blackoutPeriodNanos, long weightExpirationPeriodNanos, boolean enableOobLoadReport, long oobReportingPeriodNanos, - long weightUpdatePeriodNanos) { + long weightUpdatePeriodNanos, + float errorUtilizationPenalty) { this.blackoutPeriodNanos = blackoutPeriodNanos; this.weightExpirationPeriodNanos = weightExpirationPeriodNanos; this.enableOobLoadReport = enableOobLoadReport; this.oobReportingPeriodNanos = oobReportingPeriodNanos; this.weightUpdatePeriodNanos = weightUpdatePeriodNanos; + this.errorUtilizationPenalty = errorUtilizationPenalty; } static final class Builder { @@ -442,6 +473,7 @@ static final class Builder { boolean enableOobLoadReport = false; long oobReportingPeriodNanos = 10_000_000_000L; // 10s long weightUpdatePeriodNanos = 1_000_000_000L; // 1s + float errorUtilizationPenalty = 1.0F; private Builder() { @@ -472,10 +504,15 @@ Builder setWeightUpdatePeriodNanos(long weightUpdatePeriodNanos) { return this; } + Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) { + this.errorUtilizationPenalty = errorUtilizationPenalty; + return this; + } + WeightedRoundRobinLoadBalancerConfig build() { return new WeightedRoundRobinLoadBalancerConfig(blackoutPeriodNanos, weightExpirationPeriodNanos, enableOobLoadReport, oobReportingPeriodNanos, - weightUpdatePeriodNanos); + weightUpdatePeriodNanos, errorUtilizationPenalty); } } } diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java index ceaa4d7e97d..161e7c4ed0c 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProvider.java @@ -79,6 +79,7 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawC Long oobReportingPeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "oobReportingPeriod"); Boolean enableOobLoadReport = JsonUtil.getBoolean(rawConfig, "enableOobLoadReport"); Long weightUpdatePeriodNanos = JsonUtil.getStringAsDuration(rawConfig, "weightUpdatePeriod"); + Float errorUtilizationPenalty = JsonUtil.getNumberAsFloat(rawConfig, "errorUtilizationPenalty"); WeightedRoundRobinLoadBalancerConfig.Builder configBuilder = WeightedRoundRobinLoadBalancerConfig.newBuilder(); @@ -100,6 +101,9 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawC configBuilder.setWeightUpdatePeriodNanos(MIN_WEIGHT_UPDATE_PERIOD_NANOS); } } + if (errorUtilizationPenalty != null) { + configBuilder.setErrorUtilizationPenalty(errorUtilizationPenalty); + } return ConfigOrError.fromConfig(configBuilder.build()); } } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java index f6d8dcbfb7e..2470202b2af 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java @@ -117,6 +117,7 @@ private static OrcaLoadReport.Builder fromInternalReport(MetricReport internalRe .setCpuUtilization(internalReport.getCpuUtilization()) .setMemUtilization(internalReport.getMemoryUtilization()) .setRpsFractional(internalReport.getQps()) + .setEps(internalReport.getEps()) .putAllUtilization(internalReport.getUtilizationMetrics()) .putAllRequestCost(internalReport.getRequestCostMetrics()); } @@ -145,6 +146,10 @@ private static void mergeMetrics( if (isReportValueSet(rps)) { metricRecorderReportBuilder.setRpsFractional(rps); } + double eps = callMetricRecorderReport.getEps(); + if (isReportValueSet(eps)) { + metricRecorderReportBuilder.setEps(eps); + } } private static boolean isReportValueSet(double value) { diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java index 97414529678..2778cfdf9ea 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java @@ -254,7 +254,7 @@ public void inboundTrailers(Metadata trailers) { static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) { return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), - loadReport.getMemUtilization(), loadReport.getRpsFractional(), + loadReport.getMemUtilization(), loadReport.getRpsFractional(), loadReport.getEps(), loadReport.getRequestCostMap(), loadReport.getUtilizationMap()); } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java b/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java index 30522a5e0f6..ce92225cb1e 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaServiceImpl.java @@ -151,6 +151,7 @@ private OrcaLoadReport generateMetricsReport() { return OrcaLoadReport.newBuilder().setCpuUtilization(internalReport.getCpuUtilization()) .setMemUtilization(internalReport.getMemoryUtilization()) .setRpsFractional(internalReport.getQps()) + .setEps(internalReport.getEps()) .putAllUtilization(internalReport.getUtilizationMetrics()) .build(); } diff --git a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java index db69584901d..fe500105bc6 100644 --- a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java @@ -26,6 +26,7 @@ import com.google.protobuf.Any; import com.google.protobuf.BoolValue; import com.google.protobuf.Duration; +import com.google.protobuf.FloatValue; import com.google.protobuf.Struct; import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt64Value; @@ -95,6 +96,8 @@ public class LoadBalancerConfigFactoryTest { .setBlackoutPeriod(Duration.newBuilder().setSeconds(287).build()) .setEnableOobLoadReport( BoolValue.newBuilder().setValue(true).build()) + .setErrorUtilizationPenalty( + FloatValue.newBuilder().setValue(1.75F).build()) .build())) .build()) .build(); @@ -125,7 +128,8 @@ public class LoadBalancerConfigFactoryTest { private static final LbConfig VALID_WRR_CONFIG = new LbConfig("wrr_locality_experimental", ImmutableMap.of("childPolicy", ImmutableList.of( ImmutableMap.of("weighted_round_robin", - ImmutableMap.of("blackoutPeriod","287s", "enableOobLoadReport", true ))))); + ImmutableMap.of("blackoutPeriod","287s", "enableOobLoadReport", true, + "errorUtilizationPenalty", 1.75F ))))); private static final LbConfig VALID_RING_HASH_CONFIG = new LbConfig("ring_hash_experimental", ImmutableMap.of("minRingSize", (double) RING_HASH_MIN_RING_SIZE, "maxRingSize", (double) RING_HASH_MAX_RING_SIZE)); diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java index db72d855258..ddde84ca842 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerProviderTest.java @@ -78,7 +78,8 @@ public void parseLoadBalancingConfig() throws IOException { + " \"weightExpirationPeriod\" : \"300s\"," + " \"oobReportingPeriod\" : \"100s\"," + " \"enableOobLoadReport\" : true," - + " \"weightUpdatePeriod\" : \"2s\"" + + " \"weightUpdatePeriod\" : \"2s\"," + + " \"errorUtilizationPenalty\" : \"1.75\"" + " }"; ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig( @@ -91,6 +92,7 @@ public void parseLoadBalancingConfig() throws IOException { assertThat(config.oobReportingPeriodNanos).isEqualTo(100_000_000_000L); assertThat(config.enableOobLoadReport).isEqualTo(true); assertThat(config.weightUpdatePeriodNanos).isEqualTo(2_000_000_000L); + assertThat(config.errorUtilizationPenalty).isEqualTo(1.75F); } @Test @@ -106,6 +108,7 @@ public void parseLoadBalancingConfigDefaultValues() throws IOException { assertThat(config.weightExpirationPeriodNanos).isEqualTo(180_000_000_000L); assertThat(config.enableOobLoadReport).isEqualTo(false); assertThat(config.weightUpdatePeriodNanos).isEqualTo(100_000_000L); + assertThat(config.errorUtilizationPenalty).isEqualTo(1.0F); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index da298d3abbb..011bdbb3427 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -205,12 +205,19 @@ public void wrrLifeCycle() { assertThat(weightedPicker.getList().size()).isEqualTo(1); weightedPicker = (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); assertThat(weightedPicker.getList().size()).isEqualTo(2); + String weightedPickerStr = weightedPicker.toString(); + assertThat(weightedPickerStr).contains("enableOobLoadReport=false"); + assertThat(weightedPickerStr).contains("errorUtilizationPenalty=1.0"); + assertThat(weightedPickerStr).contains("list="); + WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -251,10 +258,12 @@ public void enableOobLoadReportConfig() { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.9, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.9, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); PickResult pickResult = weightedPicker.pickSubchannel(mockArgs); assertThat(pickResult.getSubchannel()).isEqualTo(weightedSubchannel1); @@ -307,9 +316,12 @@ private void pickByWeight(MetricReport r1, MetricReport r2, MetricReport r3, WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); - weightedSubchannel1.onLoadReport(r1); - weightedSubchannel2.onLoadReport(r2); - weightedSubchannel3.onLoadReport(r3); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + r1); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + r2); + weightedSubchannel3.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + r3); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 10000; i++) { @@ -328,31 +340,102 @@ private void pickByWeight(MetricReport r1, MetricReport r2, MetricReport r3, @Test public void pickByWeight_LargeWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 999, new HashMap<>(), new HashMap<>()); + 0.1, 0.1, 999, 0, new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.9, 0.1, 2, new HashMap<>(), new HashMap<>()); + 0.9, 0.1, 2, 0, new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0.1, 100, new HashMap<>(), new HashMap<>()); + 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); double totalWeight = 999 / 0.1 + 2 / 0.9 + 100 / 0.86; pickByWeight(report1, report2, report3, 999 / 0.1 / totalWeight, 2 / 0.9 / totalWeight, 100 / 0.86 / totalWeight); } + @Test + public void pickByWeight_largeWeight_withEps_defaultErrorUtilizationPenalty() { + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 999, 13, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0.9, 0.1, 2, 1.8, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0.86, 0.1, 100, 3, new HashMap<>(), new HashMap<>()); + double weight1 = 999 / (0.1 + 13 / 999F * weightedConfig.errorUtilizationPenalty); + double weight2 = 2 / (0.9 + 1.8 / 2F * weightedConfig.errorUtilizationPenalty); + double weight3 = 100 / (0.86 + 3 / 100F * weightedConfig.errorUtilizationPenalty); + double totalWeight = weight1 + weight2 + weight3; + + pickByWeight(report1, report2, report3, weight1 / totalWeight, weight2 / totalWeight, + weight3 / totalWeight); + } + @Test public void pickByWeight_normalWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0.1, 22, new HashMap<>(), new HashMap<>()); + 0.12, 0.1, 22, 0, new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0.1, 40, new HashMap<>(), new HashMap<>()); + 0.28, 0.1, 40, 0, new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0.1, 100, new HashMap<>(), new HashMap<>()); + 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); double totalWeight = 22 / 0.12 + 40 / 0.28 + 100 / 0.86; pickByWeight(report1, report2, report3, 22 / 0.12 / totalWeight, 40 / 0.28 / totalWeight, 100 / 0.86 / totalWeight ); } + @Test + public void pickByWeight_normalWeight_withEps_defaultErrorUtilizationPenalty() { + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0.12, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0.28, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0.86, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); + double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); + double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); + double totalWeight = weight1 + weight2 + weight3; + + pickByWeight(report1, report2, report3, weight1 / totalWeight, weight2 / totalWeight, + weight3 / totalWeight); + } + + @Test + public void pickByWeight_normalWeight_withEps_customErrorUtilizationPenalty() { + weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder() + .setErrorUtilizationPenalty(1.75F).build(); + + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0.12, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0.28, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0.86, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); + double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); + double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); + double totalWeight = weight1 + weight2 + weight3; + + pickByWeight(report1, report2, report3, weight1 / totalWeight, weight2 / totalWeight, + weight3 / totalWeight); + } + + @Test + public void pickByWeight_avgWeight_zeroCpuUtilization_withEps_customErrorUtilizationPenalty() { + weightedConfig = WeightedRoundRobinLoadBalancerConfig.newBuilder() + .setErrorUtilizationPenalty(1.75F).build(); + + MetricReport report1 = InternalCallMetricRecorder.createMetricReport( + 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + MetricReport report2 = InternalCallMetricRecorder.createMetricReport( + 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + MetricReport report3 = InternalCallMetricRecorder.createMetricReport( + 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + double avgSubchannelPickRatio = 1.0 / 3; + + pickByWeight(report1, report2, report3, avgSubchannelPickRatio, avgSubchannelPickRatio, + avgSubchannelPickRatio); + } + @Test public void emptyConfig() { assertThat(wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() @@ -395,10 +478,12 @@ public void blackoutPeriod() { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -453,10 +538,12 @@ public void updateWeightTimer() { assertThat(weightedPicker.getList().size()).isEqualTo(2); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -468,10 +555,12 @@ public void updateWeightTimer() { .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) .setAttributes(affinity).build())); assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); //timer fires, new weight updated assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) @@ -500,10 +589,12 @@ public void weightExpired() { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -562,8 +653,10 @@ public void rrFallback() { pickCount.getOrDefault(pickResult.getSubchannel(), 0) + 1); assertThat(pickResult.getStreamTracerFactory()).isNotNull(); WrrSubchannel subchannel = (WrrSubchannel)pickResult.getSubchannel(); - subchannel.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, qpsByChannel.get(subchannel), new HashMap<>(), new HashMap<>())); + subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, qpsByChannel.get(subchannel), 0, + new HashMap<>(), new HashMap<>())); } assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 1.0 / 2)) .isAtMost(0.1); @@ -575,9 +668,11 @@ public void rrFallback() { pickCount.put(pickResult.getSubchannel(), pickCount.getOrDefault(pickResult.getSubchannel(), 0) + 1); assertThat(pickResult.getStreamTracerFactory()).isNotNull(); - WrrSubchannel subchannel = (WrrSubchannel)pickResult.getSubchannel(); - subchannel.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, qpsByChannel.get(subchannel), new HashMap<>(), new HashMap<>())); + WrrSubchannel subchannel = (WrrSubchannel) pickResult.getSubchannel(); + subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, qpsByChannel.get(subchannel), 0, + new HashMap<>(), new HashMap<>())); fakeClock.forwardTime(50, TimeUnit.MILLISECONDS); } assertThat(pickCount.size()).isEqualTo(2); @@ -613,10 +708,12 @@ public void unknownWeightIsAvgWeight() { WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -655,10 +752,12 @@ public void pickFromOtherThread() throws Exception { (WeightedRoundRobinPicker) pickerCaptor.getAllValues().get(1); WrrSubchannel weightedSubchannel1 = (WrrSubchannel) weightedPicker.getList().get(0); WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); - weightedSubchannel1.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.1, 0.1, 1, new HashMap<>(), new HashMap<>())); - weightedSubchannel2.onLoadReport(InternalCallMetricRecorder.createMetricReport( - 0.2, 0.1, 1, new HashMap<>(), new HashMap<>())); + weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.1, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( + InternalCallMetricRecorder.createMetricReport( + 0.2, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); CyclicBarrier barrier = new CyclicBarrier(2); Map pickCount = new ConcurrentHashMap<>(); pickCount.put(weightedSubchannel1, new AtomicInteger(0)); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java index 90a7e4a58d7..f26f1eba879 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplDataTest.java @@ -28,6 +28,7 @@ import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import com.google.protobuf.FloatValue; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.StringValue; @@ -2003,6 +2004,8 @@ public void parseCluster_WrrLbPolicy_defaultLbConfig() throws ResourceInvalidExc .setBlackoutPeriod(Duration.newBuilder().setSeconds(17).build()) .setEnableOobLoadReport( BoolValue.newBuilder().setValue(true).build()) + .setErrorUtilizationPenalty( + FloatValue.newBuilder().setValue(1.75F).build()) .build())) .build()) .build()) @@ -2046,6 +2049,7 @@ public void parseCluster_WrrLbPolicy_defaultLbConfig() throws ResourceInvalidExc assertThat(result.oobReportingPeriodNanos).isEqualTo(10_000_000_000L); assertThat(result.weightUpdatePeriodNanos).isEqualTo(1_000_000_000L); assertThat(result.weightExpirationPeriodNanos).isEqualTo(180_000_000_000L); + assertThat(result.errorUtilizationPenalty).isEqualTo(1.75F); } @Test diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java index 7681f0b42f4..d8a1492f7cf 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java @@ -75,6 +75,7 @@ public class OrcaMetricReportingServerInterceptorTest { private double cpuUtilizationMetrics = 0; private double memoryUtilizationMetrics = 0; private double qpsMetrics = 0; + private double epsMetrics = 0; private MetricRecorder metricRecorder; private final AtomicReference trailersCapture = new AtomicReference<>(); @@ -99,6 +100,7 @@ public void unaryRpc( CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics); CallMetricRecorder.getCurrent().recordMemoryUtilizationMetric(memoryUtilizationMetrics); CallMetricRecorder.getCurrent().recordQpsMetric(qpsMetrics); + CallMetricRecorder.getCurrent().recordEpsMetric(epsMetrics); SimpleResponse response = SimpleResponse.newBuilder().setResponseMessage("Simple response").build(); responseObserver.onNext(response); @@ -194,6 +196,7 @@ public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() { cpuUtilizationMetrics = 0.3465; memoryUtilizationMetrics = 0.764; qpsMetrics = 3.1415926535; + epsMetrics = 1.618; ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST); Metadata receivedTrailers = trailersCapture.get(); OrcaLoadReport report = @@ -205,6 +208,7 @@ public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() { assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getMemUtilization()).isEqualTo(0.764); assertThat(report.getRpsFractional()).isEqualTo(3.1415926535); + assertThat(report.getEps()).isEqualTo(1.618); } @Test @@ -216,6 +220,7 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR memoryUtilizationMetrics = 0.967; metricRecorder.setMemoryUtilizationMetric(0.764); metricRecorder.setQpsMetric(1.618); + metricRecorder.setEpsMetric(3.14159); metricRecorder.putUtilizationMetric("serverUtil1", 0.7467); metricRecorder.putUtilizationMetric("serverUtil2", 0.2233); metricRecorder.putUtilizationMetric("util1", 0.01); @@ -233,14 +238,17 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getMemUtilization()).isEqualTo(0.967); assertThat(report.getRpsFractional()).isEqualTo(1.618); + assertThat(report.getEps()).isEqualTo(3.14159); } @Test public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricRecorderNoMap() { qpsMetrics = 5142.77; + epsMetrics = 2233.88; metricRecorder.setCpuUtilizationMetric(0.314159); metricRecorder.setMemoryUtilizationMetric(0.764); metricRecorder.setQpsMetric(1.618); + metricRecorder.setEpsMetric(3.14159); ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST); Metadata receivedTrailers = trailersCapture.get(); @@ -252,6 +260,7 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR assertThat(report.getCpuUtilization()).isEqualTo(0.314159); assertThat(report.getMemUtilization()).isEqualTo(0.764); assertThat(report.getRpsFractional()).isEqualTo(5142.77); + assertThat(report.getEps()).isEqualTo(2233.88); } private static final class TrailersCapturingClientInterceptor implements ClientInterceptor { diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java index 4d0e2070b3b..91f0e1f493a 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java @@ -120,6 +120,8 @@ static boolean reportEqual(MetricReport a, MetricReport b) { return a.getCpuUtilization() == b.getCpuUtilization() && a.getMemoryUtilization() == b.getMemoryUtilization() + && a.getQps() == b.getQps() + && a.getEps() == b.getEps() && Objects.equal(a.getRequestCostMetrics(), b.getRequestCostMetrics()) && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()); } diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java index 124a21ddb76..72c518eb5b2 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaServiceImplTest.java @@ -144,17 +144,19 @@ public void testRequestIntervalLess() { OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); defaultTestService.putUtilizationMetric("buffer", 0.2); defaultTestService.setQpsMetric(1.9); + defaultTestService.setEpsMetric(0.2233); call.start(listener, new Metadata()); call.sendMessage(OrcaLoadReportRequest.newBuilder() .setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build()); call.halfClose(); call.request(1); OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2) - .setRpsFractional(1.9).build(); + .setRpsFractional(1.9).setEps(0.2233).build(); verify(listener).onMessage(eq(expect)); reset(listener); defaultTestService.removeUtilizationMetric("buffer0"); defaultTestService.clearQpsMetric(); + defaultTestService.clearEpsMetric(); assertThat(fakeClock.forwardTime(500, TimeUnit.NANOSECONDS)).isEqualTo(0); verifyNoInteractions(listener); assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1); @@ -250,12 +252,14 @@ public void testApis() throws Exception { .putAllUtilization(firstUtilization) .putUtilization("queue", 1.0) .setRpsFractional(1239.01) + .setEps(1.618) .build(); defaultTestService.setCpuUtilizationMetric(goldenReport.getCpuUtilization()); defaultTestService.setMemoryUtilizationMetric(goldenReport.getMemUtilization()); defaultTestService.setAllUtilizationMetrics(firstUtilization); defaultTestService.putUtilizationMetric("queue", 1.0); defaultTestService.setQpsMetric(1239.01); + defaultTestService.setEpsMetric(1.618); Iterator reports = OpenRcaServiceGrpc.newBlockingStub(channel) .streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build()); assertThat(reports.next()).isEqualTo(goldenReport); @@ -263,6 +267,7 @@ public void testApis() throws Exception { defaultTestService.clearCpuUtilizationMetric(); defaultTestService.clearMemoryUtilizationMetric(); defaultTestService.clearQpsMetric(); + defaultTestService.clearEpsMetric(); fakeClock.forwardTime(1, TimeUnit.SECONDS); goldenReport = OrcaLoadReport.newBuilder() .putAllUtilization(firstUtilization) @@ -279,6 +284,7 @@ public void testApis() throws Exception { defaultTestService.setMemoryUtilizationMetric(-0.001); defaultTestService.setMemoryUtilizationMetric(1.001); defaultTestService.setQpsMetric(-0.001); + defaultTestService.setEpsMetric(-0.001); defaultTestService.putUtilizationMetric("util-out-of-range", -0.001); defaultTestService.putUtilizationMetric("util-out-of-range", 1.001); fakeClock.forwardTime(1, TimeUnit.SECONDS);