Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add counter and gauge metrics #11661

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
@Nullable
@ExperimentalApi("Insert GitHub issue")
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}
Expand All @@ -427,7 +426,7 @@
.add("channelLogger", channelLogger)
.add("executor", executor)
.add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)

Check warning on line 429 in api/src/main/java/io/grpc/NameResolver.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/NameResolver.java#L429

Added line #L429 was not covered by tests
.toString();
}

Expand Down Expand Up @@ -563,7 +562,6 @@
/**
* See {@link Args#getMetricRecorder()}. This is an optional field.
*/
@ExperimentalApi("Insert github issue")
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
Expand Down
32 changes: 9 additions & 23 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
private final Map<String, MetricRecorder> targetToMetricRecorderMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand All @@ -83,12 +82,6 @@ public ObjectPool<XdsClient> get(String target) {
return targetToXdsClientMap.get(target);
}

@Override
@Nullable
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
return this.getOrCreate(target, new MetricRecorder() {});
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
Expand All @@ -107,12 +100,8 @@ public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRec
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
MetricRecorder metricRecorderForTarget = targetToMetricRecorderMap.get(target);
metricRecorder =
metricRecorderForTarget != null ? metricRecorderForTarget : metricRecorder;
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
targetToXdsClientMap.put(target, ref);
targetToMetricRecorderMap.putIfAbsent(target, metricRecorder);
}
}
}
Expand All @@ -124,25 +113,19 @@ public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}

@VisibleForTesting
MetricRecorder getMetricRecorder(String target) {
return targetToMetricRecorderMap.get(target);
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
protected class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
private final ExponentialBackoffPolicy.Provider backoffPolicyProvider =
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final XdsClientMetricReporterImpl xdsClientMetricReporter;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -156,7 +139,7 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
this.metricRecorder = metricRecorder;
this.xdsClientMetricReporter = new XdsClientMetricReporterImpl(metricRecorder);
}

@Override
Expand All @@ -171,13 +154,14 @@ public XdsClient getObject() {
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
BACKOFF_POLICY_PROVIDER,
backoffPolicyProvider,
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo),
getTarget(),
new XdsClientMetricReporterImpl(metricRecorder));
xdsClientMetricReporter);
xdsClientMetricReporter.setXdsClient(xdsClient);
}
refCount++;
return xdsClient;
Expand All @@ -191,6 +175,8 @@ public XdsClient returnObject(Object object) {
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
xdsClientMetricReporter.close();
targetToXdsClientMap.remove(target);
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
return null;
Expand Down
40 changes: 13 additions & 27 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClientMetricReporter;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,8 +54,6 @@
private Registration gaugeRegistration = null;
@Nullable
private XdsClient xdsClient = null;
@Nullable
private CallbackMetricReporter callbackMetricReporter = null;

static {
MetricInstrumentRegistry metricInstrumentRegistry
Expand Down Expand Up @@ -108,8 +107,7 @@
Arrays.asList(target, xdsServer), Collections.emptyList());
}

@Override
public void setXdsClient(XdsClient client) {
void setXdsClient(XdsClient client) {
this.xdsClient = client;
// register gauge here
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
this.gaugeRegistration = metricRecorder.registerBatchCallback(new BatchCallback() {
Expand All @@ -120,48 +118,36 @@
}, CONNECTED_GAUGE, RESOURCES_GAUGE);
}

@Override
public void close() {
void close() {
if (gaugeRegistration != null) {
gaugeRegistration.close();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
}
}

void reportCallbackMetrics(BatchRecorder recorder) {
if (callbackMetricReporter == null) {
// Instantiate only if not injected
callbackMetricReporter = new CallbackMetricReporterImpl(recorder);
}
CallbackMetricReporter callbackMetricReporter = createCallbackMetricReporter(recorder);
try {
reportResourceCounts(callbackMetricReporter);
reportServerConnections(callbackMetricReporter);
SettableFuture<Void> ret = this.xdsClient.reportResourceCounts(
callbackMetricReporter);
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = ret.get(5, TimeUnit.SECONDS);
SettableFuture<Void> ret1 = this.xdsClient.reportServerConnections(callbackMetricReporter);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused1 = ret1.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); // re-set the current thread's interruption state

Check warning on line 139 in xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java#L139

Added line #L139 was not covered by tests
}
logger.log(Level.WARNING, "Failed to report gauge metrics", e);
}
}

void reportResourceCounts(CallbackMetricReporter callbackMetricReporter) throws Exception {
SettableFuture<Void> ret = this.xdsClient.reportResourceCounts(
callbackMetricReporter);
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = ret.get(5, TimeUnit.SECONDS);
}

void reportServerConnections(CallbackMetricReporter callbackMetricReporter) throws Exception {
SettableFuture<Void> ret = this.xdsClient.reportServerConnections(callbackMetricReporter);
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = ret.get(5, TimeUnit.SECONDS);
}

/**
* Allows injecting a custom {@link CallbackMetricReporter} for testing purposes.
*/
@VisibleForTesting
void injectCallbackMetricReporter(CallbackMetricReporter reporter) {
this.callbackMetricReporter = reporter;
CallbackMetricReporter createCallbackMetricReporter(BatchRecorder recorder) {
return new CallbackMetricReporterImpl(recorder);
}

@VisibleForTesting
Expand Down
2 changes: 0 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ interface XdsClientPoolFactory {
@Nullable
ObjectPool<XdsClient> get(String target);

ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException;

ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException;

Expand Down
3 changes: 2 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -171,7 +172,7 @@ public void run() {

private void internalStart() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
xdsClientPool = xdsClientPoolFactory.getOrCreate("", new MetricRecorder() {});
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import com.google.protobuf.Any;
import io.grpc.ExperimentalApi;
import io.grpc.Status;
import io.grpc.xds.XdsClientMetricReporter.CallbackMetricReporter;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.XdsClientMetricReporter.CallbackMetricReporter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -391,7 +391,7 @@
* </ul>
*/
public SettableFuture<Void> reportResourceCounts(CallbackMetricReporter callbackMetricReporter) {
throw new UnsupportedOperationException();

Check warning on line 394 in xds/src/main/java/io/grpc/xds/client/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClient.java#L394

Added line #L394 was not covered by tests
}

/**
Expand All @@ -400,7 +400,7 @@
*/
public SettableFuture<Void> reportServerConnections(
CallbackMetricReporter callbackMetricReporter) {
throw new UnsupportedOperationException();

Check warning on line 403 in xds/src/main/java/io/grpc/xds/client/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClient.java#L403

Added line #L403 was not covered by tests
}

static final class ProcessingTracker {
Expand Down
6 changes: 2 additions & 4 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.XdsClientMetricReporter;
import io.grpc.xds.XdsClientMetricReporter.CallbackMetricReporter;
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceStore;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsClientMetricReporter.CallbackMetricReporter;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.Collection;
Expand Down Expand Up @@ -127,7 +127,6 @@ public XdsClientImpl(
this.securityConfig = securityConfig;
this.target = target;
this.metricReporter = metricReporter;
metricReporter.setXdsClient(this);
logId = InternalLogId.allocate("xds-client", null);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
Expand Down Expand Up @@ -191,7 +190,6 @@ public void run() {
for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
lrsClient.stopLoadReporting();
}
metricReporter.close();
cleanUpResourceTimers();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package io.grpc.xds;
package io.grpc.xds.client;

import io.grpc.Internal;
import io.grpc.xds.client.XdsClient;

/**
* Interface for reporting metrics from the xDS client.
Expand All @@ -36,7 +35,7 @@
*/
default void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
String target, String xdsServer, String resourceType) {
}

Check warning on line 38 in xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java#L38

Added line #L38 was not covered by tests

/**
* Reports number of xDS servers going from healthy to unhealthy.
Expand All @@ -46,48 +45,22 @@
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
*/
default void reportServerFailure(long serverFailure, String target, String xdsServer) {
}

Check warning on line 48 in xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java#L48

Added line #L48 was not covered by tests

/**
* Sets the {@link XdsClient} instance.
*/
default void setXdsClient(XdsClient xdsClient) {
}

/**
* Closes the metric reporter.
*/
default void close() {
}

/**
* Interface for reporting metrics through callback.
*
*/
interface CallbackMetricReporter {

/**
* Reports number of resources in each cache state.
*
* @param resourceCount Number of resources.
* @param cacheState Status of the resource metadata
* {@link io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus}.
* @param resourceType Type of XDS resource (e.g., "envoy.config.listener.v3.Listener").
* @param target Target of the gRPC channel.
*/

// TODO(@dnvindhya): include the "authority" label once xds.authority is available.
default void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
String target) {
}

Check warning on line 60 in xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java#L60

Added line #L60 was not covered by tests

/**
* Reports whether xDS client has a working ADS stream to the xDS server.
*
* @param isConnected 1 if the client is connected to the xDS server, 0 otherwise.
* @param target Target of the gRPC channel.
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
*/

default void reportServerConnections(int isConnected, String target, String xdsServer) {
}

Check warning on line 64 in xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java#L64

Added line #L64 was not covered by tests
}
}
4 changes: 0 additions & 4 deletions xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,6 @@ public void setBootstrapOverride(Map<String, ?> bootstrap) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target) {
throw new UnsupportedOperationException("Should not be called");
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import io.grpc.xds.client.XdsClient.ResourceUpdate;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import io.grpc.xds.client.XdsTransportFactory;
Expand Down
Loading
Loading