Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add counter and gauge metrics #11661

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,27 @@ private static class SharedXdsClientPoolProviderHolder {

@ThreadSafe
@VisibleForTesting
protected class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final XdsClientMetricReporterImpl xdsClientMetricReporter;
private final MetricRecorder metricRecorder;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
@GuardedBy("lock")
private XdsClient xdsClient;
@GuardedBy("lock")
private int refCount;
@GuardedBy("lock")
private XdsClientMetricReporterImpl metricReporter;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
this.xdsClientMetricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
this.metricRecorder = metricRecorder;
}

@Override
Expand All @@ -150,6 +152,7 @@ public XdsClient getObject() {
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
Expand All @@ -159,8 +162,8 @@ public XdsClient getObject() {
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo),
xdsClientMetricReporter);
xdsClientMetricReporter.setXdsClient(xdsClient);
metricReporter);
metricReporter.setXdsClient(xdsClient);
}
refCount++;
return xdsClient;
Expand All @@ -174,7 +177,8 @@ public XdsClient returnObject(Object object) {
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
xdsClientMetricReporter.close();
metricReporter.close();
metricReporter = null;
targetToXdsClientMap.remove(target);
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
Expand Down
71 changes: 58 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Internal;
import io.grpc.LongCounterMetricInstrument;
Expand All @@ -28,11 +29,19 @@
import io.grpc.MetricRecorder.Registration;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceCallback;
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -41,7 +50,7 @@
* XdsClientMetricReporter implementation.
*/
@Internal
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
public class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {

private static final Logger logger = Logger.getLogger(
XdsClientMetricReporterImpl.class.getName());
Expand All @@ -55,8 +64,6 @@ public class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
private final String target;
@Nullable
private Registration gaugeRegistration = null;
@Nullable
private XdsClient xdsClient = null;

static {
MetricInstrumentRegistry metricInstrumentRegistry
Expand Down Expand Up @@ -111,41 +118,79 @@ public void reportServerFailure(long serverFailure, String xdsServer) {
Arrays.asList(target, xdsServer), Collections.emptyList());
}

void setXdsClient(XdsClient client) {
this.xdsClient = client;
void setXdsClient(XdsClient xdsClient) {
assert gaugeRegistration == null;
// register gauge here
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
this.gaugeRegistration = metricRecorder.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
reportCallbackMetrics(recorder);
reportCallbackMetrics(recorder, xdsClient);
}
}, CONNECTED_GAUGE, RESOURCES_GAUGE);
}

void close() {
if (gaugeRegistration != null) {
gaugeRegistration.close();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
gaugeRegistration = null;
}
}

void reportCallbackMetrics(BatchRecorder recorder) {
void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
MetricReporterCallback callback = new MetricReporterCallback(recorder, target);
try {
SettableFuture<Void> reportResourceCountsCompleted = this.xdsClient.reportResourceCounts(
callback);
SettableFuture<Void> reportServerConnectionsCompleted =
this.xdsClient.reportServerConnections(callback);
xdsClient.reportServerConnections(callback);

ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
getResourceMetadataCompleted = xdsClient.getSubscribedResourcesMetadataSnapshot();

Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);

SettableFuture<Void> reportResourceCountsCompleted = computeAndReportResourceCounts(
metadataByType, callback);

// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused1 = reportResourceCountsCompleted.get(5, TimeUnit.SECONDS);
Void unused2 = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
Void unused1 = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
Void unused2 = reportResourceCountsCompleted.get(5, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
}
logger.log(Level.WARNING, "Failed to report gauge metrics", e);
}
}

private SettableFuture<Void> computeAndReportResourceCounts(
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
MetricReporterCallback callback) {
SettableFuture<Void> future = SettableFuture.create();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
metadataByType.entrySet()) {
XdsResourceType<?> type = metadataByTypeEntry.getKey();

Map<String, Long> resourceCountsByState = new HashMap<>();
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
}

resourceCountsByState.forEach((cacheState, count) ->
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
}
future.set(null);
return future;
}

private static String cacheStateFromResourceStatus(ResourceMetadataStatus metadataStatus,
boolean isResourceCached) {
String status = metadataStatus.toString().toLowerCase(Locale.ROOT);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
return metadataStatus == ResourceMetadataStatus.NACKED && isResourceCached
? status + "_but_cached" : status;
}

@VisibleForTesting
static final class MetricReporterCallback implements ResourceCallback,
ServerConnectionCallback {
Expand Down
8 changes: 1 addition & 7 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,7 @@ void readyHandler() {
*/
// Must be synchronized
boolean hasWorkingAdsStream() {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
if (streamClosedNoResponse || shutdown) {
return false;
}
if (adsStream == null) {
return true;
}
return adsStream.responseReceived || !adsStream.closed;
return !streamClosedNoResponse;
}


Expand Down
43 changes: 18 additions & 25 deletions xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,44 +155,46 @@ public static final class ResourceMetadata {
private final String version;
private final ResourceMetadataStatus status;
private final long updateTimeNanos;
private final boolean cached;
@Nullable private final Any rawResource;
@Nullable private final UpdateFailureState errorState;

private ResourceMetadata(
ResourceMetadataStatus status, String version, long updateTimeNanos,
ResourceMetadataStatus status, String version, long updateTimeNanos, boolean cached,
@Nullable Any rawResource, @Nullable UpdateFailureState errorState) {
this.status = checkNotNull(status, "status");
this.version = checkNotNull(version, "version");
this.updateTimeNanos = updateTimeNanos;
this.cached = cached;
this.rawResource = rawResource;
this.errorState = errorState;
}

static ResourceMetadata newResourceMetadataUnknown() {
return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, null, null);
public static ResourceMetadata newResourceMetadataUnknown() {
return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, false,null, null);
}

static ResourceMetadata newResourceMetadataRequested() {
return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, null, null);
public static ResourceMetadata newResourceMetadataRequested(boolean cached) {
return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, cached, null, null);
}

static ResourceMetadata newResourceMetadataDoesNotExist() {
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, null, null);
public static ResourceMetadata newResourceMetadataDoesNotExist() {
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
}

public static ResourceMetadata newResourceMetadataAcked(
Any rawResource, String version, long updateTimeNanos) {
checkNotNull(rawResource, "rawResource");
return new ResourceMetadata(
ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null);
ResourceMetadataStatus.ACKED, version, updateTimeNanos, true, rawResource, null);
}

static ResourceMetadata newResourceMetadataNacked(
public static ResourceMetadata newResourceMetadataNacked(
ResourceMetadata metadata, String failedVersion, long failedUpdateTime,
String failedDetails) {
String failedDetails, boolean cached) {
checkNotNull(metadata, "metadata");
return new ResourceMetadata(ResourceMetadataStatus.NACKED,
metadata.getVersion(), metadata.getUpdateTimeNanos(), metadata.getRawResource(),
metadata.getVersion(), metadata.getUpdateTimeNanos(), cached, metadata.getRawResource(),
new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails));
}

Expand All @@ -211,6 +213,11 @@ public long getUpdateTimeNanos() {
return updateTimeNanos;
}

/** Returns whether the resource was cached. */
public boolean isCached() {
return cached;
}

/** The last successfully updated xDS resource as it was returned by the server. */
@Nullable
public Any getRawResource() {
Expand Down Expand Up @@ -390,20 +397,6 @@ public interface ServerConnectionCallback {
void reportServerConnectionGauge(boolean isConnected, String xdsServer);
}

/**
* Reports the number of resources in each cache state.
*
* <p>Cache state is determined by two factors:
* <ul>
* <li>Whether the resource is cached.
* <li>The {@link io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus} of the
* resource.
* </ul>
*/
public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
throw new UnsupportedOperationException();
}

/**
* Reports whether xDS client has a working ADS stream to xDS server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this working or non-errored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used working as it was used in metric description of grpc.xds_client.connected.
Also, I think it is more nuanced than non-errored, because if there is an error on ADS stream and / or close before receiving a response, it is an error and value will remain false until ADS stream receives a response to say it has a working stream to communicate with server.
Added link to A78 which has definition for working stream. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't the dictionary definition of "working", but you're right that it is the proposal's definition. I'd be happier if you put quotes around working, but it's up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added quotes around working.

* The definition of a working stream is defined in gRFC A78.
Expand Down
37 changes: 5 additions & 32 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@
import io.grpc.internal.TimeProvider;
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceStore;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -542,32 +540,6 @@ public SettableFuture<Void> reportServerConnections(ServerConnectionCallback cal
return future;
}

@Override
public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
SettableFuture<Void> future = SettableFuture.create();
syncContext.execute(() -> {
for (XdsResourceType<? extends ResourceUpdate> resourceType : resourceSubscribers.keySet()) {
Map<String, Long> resourceCountsByState = new HashMap<>();
for (ResourceSubscriber<? extends ResourceUpdate> subscriber :
resourceSubscribers.get(resourceType).values()) {
String cacheState = cacheStateFromResourceStatus(subscriber.metadata,
subscriber.data != null);
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
}
resourceCountsByState.forEach((cacheState, count) ->
callback.reportResourceCountGauge(count, cacheState, resourceType.typeUrl()));
}
future.set(null);
});
return future;
}

private String cacheStateFromResourceStatus(ResourceMetadata metadata, boolean isResourceCached) {
String status = metadata.getStatus().toString().toLowerCase(Locale.ROOT);
return metadata.getStatus() == ResourceMetadataStatus.NACKED && isResourceCached
? status + "_but_cached" : status;
}

/** Tracks a single subscribed resource. */
private final class ResourceSubscriber<T extends ResourceUpdate> {
@Nullable private final ServerInfo serverInfo;
Expand Down Expand Up @@ -663,7 +635,7 @@ public String toString() {
}

// Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
metadata = ResourceMetadata.newResourceMetadataRequested();
metadata = ResourceMetadata.newResourceMetadataRequested(this.data != null);

respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
Expand Down Expand Up @@ -706,10 +678,10 @@ void onData(ParsedResource<T> parsedResource, String version, long updateTime,
respTimer.cancel();
respTimer = null;
}
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
Expand Down Expand Up @@ -803,7 +775,8 @@ void onError(Status error, @Nullable ProcessingTracker tracker) {

void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
metadata = ResourceMetadata
.newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
.newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails,
data != null);
}

private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
Expand Down
Loading
Loading