Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add counter and gauge metrics #11661

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;

private Args(
Integer defaultPort,
Expand All @@ -297,7 +298,8 @@
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor,
@Nullable String overrideAuthority) {
@Nullable String overrideAuthority,
@Nullable MetricRecorder metricRecorder) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
Expand All @@ -306,6 +308,7 @@
this.channelLogger = channelLogger;
this.executor = executor;
this.overrideAuthority = overrideAuthority;
this.metricRecorder = metricRecorder;
}

/**
Expand Down Expand Up @@ -403,6 +406,15 @@
return overrideAuthority;
}

/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
@Nullable
@ExperimentalApi("Insert GitHub issue")
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}


@Override
public String toString() {
Expand All @@ -415,6 +427,7 @@
.add("channelLogger", channelLogger)
.add("executor", executor)
.add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)

Check warning on line 430 in api/src/main/java/io/grpc/NameResolver.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/NameResolver.java#L430

Added line #L430 was not covered by tests
.toString();
}

Expand All @@ -433,6 +446,7 @@
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
return builder;
}

Expand All @@ -459,6 +473,7 @@
private ChannelLogger channelLogger;
private Executor executor;
private String overrideAuthority;
private MetricRecorder metricRecorder;

Builder() {
}
Expand Down Expand Up @@ -545,6 +560,15 @@
return this;
}

/**
* See {@link Args#getMetricRecorder()}. This is an optional field.
*/
@ExperimentalApi("Insert github issue")
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
}

/**
* Builds an {@link Args}.
*
Expand All @@ -554,7 +578,8 @@
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
scheduledExecutorService, channelLogger, executor, overrideAuthority);
scheduledExecutorService, channelLogger, executor, overrideAuthority,
metricRecorder);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class NameResolverTest {
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private final String overrideAuthority = "grpc.io";
private final MetricRecorder metricRecorder = new MetricRecorder() {};
@Mock NameResolver.Listener mockListener;

@Test
Expand All @@ -77,6 +78,7 @@ public void args() {
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);

NameResolver.Args args2 = args.toBuilder().build();
assertThat(args2.getDefaultPort()).isEqualTo(defaultPort);
Expand All @@ -87,6 +89,7 @@ public void args() {
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);

assertThat(args2).isNotSameInstanceAs(args);
assertThat(args2).isNotEqualTo(args);
Expand All @@ -102,6 +105,7 @@ private NameResolver.Args createArgs() {
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.setOverrideAuthority(overrideAuthority)
.setMetricRecorder(metricRecorder)
.build();
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ ClientStream newSubstream(
builder.maxHedgedAttempts,
loadBalancerFactory);
this.authorityOverride = builder.authorityOverride;
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
this.nameResolverArgs =
NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort())
Expand All @@ -599,6 +601,7 @@ ClientStream newSubstream(
.setChannelLogger(channelLogger)
.setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
.build();
this.nameResolver = getNameResolver(
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
Expand Down Expand Up @@ -671,8 +674,6 @@ public CallTracer create() {
}
serviceConfigUpdated = true;
}
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
}

@VisibleForTesting
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,30 @@ public void metricRecorder_recordsToMetricSink() {
eq(optionalLabelValues));
}

@Test
public void metricRecorder_fromNameResolverArgs_recordsToMetricSink() {
MetricSink mockSink1 = mock(MetricSink.class);
MetricSink mockSink2 = mock(MetricSink.class);
channelBuilder.addMetricSink(mockSink1);
channelBuilder.addMetricSink(mockSink2);
createChannel();

LongCounterMetricInstrument counter = metricInstrumentRegistry.registerLongCounter(
"test_counter", "Time taken by metric recorder", "s",
ImmutableList.of("grpc.method"), Collections.emptyList(), false);
List<String> requiredLabelValues = ImmutableList.of("testMethod");
List<String> optionalLabelValues = Collections.emptyList();

NameResolver.Args args = helper.getNameResolverArgs();
assertThat(args.getMetricRecorder()).isNotNull();
args.getMetricRecorder()
.addLongCounter(counter, 10, requiredLabelValues, optionalLabelValues);
verify(mockSink1).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
verify(mockSink2).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
}

@Test
public void shutdownWithNoTransportsEverCreated() {
channelBuilder.nameResolverFactory(
Expand Down Expand Up @@ -2240,6 +2264,7 @@ public void lbHelper_getNameResolverArgs() {
assertThat(args.getSynchronizationContext())
.isSameInstanceAs(helper.getSynchronizationContext());
assertThat(args.getServiceConfigParser()).isNotNull();
assertThat(args.getMetricRecorder()).isNotNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
Expand All @@ -36,6 +37,11 @@

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
return getOrCreate(target, new MetricRecorder() {});

Check warning on line 40 in xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java#L40

Added line #L40 was not covered by tests
}

public static ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder);

Check warning on line 45 in xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java#L45

Added line #L45 was not covered by tests
}
}
29 changes: 26 additions & 3 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.MetricRecorder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -56,6 +57,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
private final Map<String, MetricRecorder> targetToMetricRecorderMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand All @@ -82,7 +84,14 @@ public ObjectPool<XdsClient> get(String target) {
}

@Override
@Nullable
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
return this.getOrCreate(target, new MetricRecorder() {});
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
Expand All @@ -98,8 +107,12 @@ public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitialization
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
MetricRecorder metricRecorderForTarget = targetToMetricRecorderMap.get(target);
metricRecorder =
metricRecorderForTarget != null ? metricRecorderForTarget : metricRecorder;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
targetToXdsClientMap.put(target, ref);
targetToMetricRecorderMap.putIfAbsent(target, metricRecorder);
}
}
}
Expand All @@ -111,6 +124,11 @@ public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}

@VisibleForTesting
MetricRecorder getMetricRecorder(String target) {
return targetToMetricRecorderMap.get(target);
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
Expand All @@ -124,6 +142,7 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -133,9 +152,11 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private int refCount;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
this.metricRecorder = metricRecorder;
}

@Override
Expand All @@ -154,7 +175,9 @@ public XdsClient getObject() {
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
new TlsContextManagerImpl(bootstrapInfo),
getTarget(),
new XdsClientMetricReporterImpl(metricRecorder));
}
refCount++;
return xdsClient;
Expand Down
93 changes: 93 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.xds.client.XdsClient;

/**
* Interface for reporting metrics from the xDS client.
*/
@Internal
public interface XdsClientMetricReporter {

/**
* Reports number of valid and invalid resources.
*
* @param validResourceCount Number of resources that were valid.
* @param invalidResourceCount Number of resources that were invalid.
* @param target Target of the gRPC channel.
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
* @param resourceType Type of XDS resource (e.g., "envoy.config.listener.v3.Listener").
*/
default void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
String target, String xdsServer, String resourceType) {
}

Check warning on line 39 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java#L39

Added line #L39 was not covered by tests

/**
* Reports number of xDS servers going from healthy to unhealthy.
*
* @param serverFailure Number of xDS server failures.
* @param target Target of the gRPC channel.
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
*/
default void reportServerFailure(long serverFailure, String target, String xdsServer) {
}

Check warning on line 49 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java#L49

Added line #L49 was not covered by tests

/**
* Sets the {@link XdsClient} instance.
*/
default void setXdsClient(XdsClient xdsClient) {
}

Check warning on line 55 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java#L55

Added line #L55 was not covered by tests

/**
* Closes the metric reporter.
*/
default void close() {
}

Check warning on line 61 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java#L61

Added line #L61 was not covered by tests

/**
* Interface for reporting metrics through callback.
*
*/
interface CallbackMetricReporter {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Reports number of resources in each cache state.
*
* @param resourceCount Number of resources.
* @param cacheState Status of the resource metadata
* {@link io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus}.
* @param resourceType Type of XDS resource (e.g., "envoy.config.listener.v3.Listener").
* @param target Target of the gRPC channel.
*/
// TODO(@dnvindhya): include the "authority" label once xds.authority is available.
default void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
String target) {
}

Check warning on line 81 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java#L81

Added line #L81 was not covered by tests

/**
* Reports whether xDS client has a working ADS stream to the xDS server.
*
* @param isConnected 1 if the client is connected to the xDS server, 0 otherwise.
* @param target Target of the gRPC channel.
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
*/
default void reportServerConnections(int isConnected, String target, String xdsServer) {
}

Check warning on line 91 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java#L91

Added line #L91 was not covered by tests
}
}
Loading