Skip to content

Commit

Permalink
Extracting vmId form Azure instance metadata (#27692)
Browse files Browse the repository at this point in the history
* Extracting vmId form Azure instance metadata (which can be mapped to containerId/nodeId) easily

* Update CHANGELOG.md

* Responded to code review feedback

* Update ClientTelemetryInfo.java

* Update CHANGELOG.md

* Added more tests

* Update pom.xml

* Update module-info.java

* Dummy

* Update pom.xml

* Fixes static analysis findings

* Reverting chaning package name

* Addressed code review comments

* Update RxDocumentClientImpl.java

* Reacted to code review comments

* Update spotbugs-exclude.xml

* Revert spotbug changes
  • Loading branch information
FabianMeiswinkel authored Mar 16, 2022
1 parent 0f41bd1 commit 427d396
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 21 deletions.
3 changes: 3 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## Release History

### 4.28.0-beta.1 (Unreleased)
#### Features Added
* Added the "VM Unique ID" - see [Accessing and Using Azure VM Unique ID](https://azure.microsoft.com/blog/accessing-and-using-azure-vm-unique-id/) - to the request diagnostics. This information helps to simplify investigating any network issues between an application hosted in Azure and the corresponding Cosmos DB service endpoint. - See [PR 27692](https://github.com/Azure/azure-sdk-for-java/pull/27692)

#### Key Bugs Fixes
* Added `decodeTime` in `CosmosDiagnostics` - See [PR 22808](https://github.com/Azure/azure-sdk-for-java/pull/22808)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.guava27.Strings;
import com.azure.cosmos.implementation.http.HttpClientConfig;
Expand Down Expand Up @@ -50,6 +51,7 @@ public void serialize(DiagnosticsClientContext clientContext, JsonGenerator gene
generator.writeStartObject();
try {
generator.writeNumberField("id", clientContext.getConfig().getClientId());
generator.writeStringField("machineId", ClientTelemetry.getMachineId(clientContext));
generator.writeStringField("connectionMode", clientContext.getConfig().getConnectionMode().toString());
generator.writeNumberField("numberOfClients", clientContext.getConfig().getActiveClientsCount());
generator.writeObjectFieldStart("connCfg");
Expand Down Expand Up @@ -87,6 +89,11 @@ class DiagnosticsClientConfig {
private RntbdTransportClient.Options options;
private String rntbdConfigAsString;
private ConnectionMode connectionMode;
private String machineId;

public void withMachineId(String machineId) {
this.machineId = machineId;
}

public void withActiveClientCounter(AtomicInteger activeClientsCnt) {
this.activeClientsCnt = activeClientsCnt;
Expand Down Expand Up @@ -178,6 +185,8 @@ public int getClientId() {
return this.clientId;
}

public String getMachineId() { return this.machineId; }

public int getActiveClientsCount() {
return this.activeClientsCnt != null ? this.activeClientsCnt.get() : -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.ApiType;
import com.azure.cosmos.implementation.batch.BatchResponseParser;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
Expand Down Expand Up @@ -110,6 +109,7 @@
*/
public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorizationTokenProvider, CpuMemoryListener,
DiagnosticsClientContext {
private static final String tempMachineId = "uuid:" + UUID.randomUUID();
private static final AtomicInteger activeClientsCnt = new AtomicInteger(0);
private static final AtomicInteger clientIdGenerator = new AtomicInteger(0);
private static final Range<String> RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES = new Range<>(
Expand Down Expand Up @@ -289,7 +289,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
ApiType apiType) {

activeClientsCnt.incrementAndGet();
this.clientId = clientIdGenerator.getAndDecrement();
this.clientId = clientIdGenerator.incrementAndGet();
this.diagnosticsClientConfig = new DiagnosticsClientConfig();
this.diagnosticsClientConfig.withClientId(this.clientId);
this.diagnosticsClientConfig.withActiveClientCounter(activeClientsCnt);
Expand Down Expand Up @@ -350,6 +350,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
this.diagnosticsClientConfig.withMultipleWriteRegionsEnabled(this.connectionPolicy.isMultipleWriteRegionsEnabled());
this.diagnosticsClientConfig.withEndpointDiscoveryEnabled(this.connectionPolicy.isEndpointDiscoveryEnabled());
this.diagnosticsClientConfig.withPreferredRegions(this.connectionPolicy.getPreferredRegions());
this.diagnosticsClientConfig.withMachineId(tempMachineId);

boolean disableSessionCapturing = (ConsistencyLevel.SESSION != consistencyLevel && !sessionCapturingOverrideEnabled);

Expand Down Expand Up @@ -457,7 +458,7 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
collectionCache);

updateGatewayProxy();
clientTelemetry = new ClientTelemetry(null, UUID.randomUUID().toString(),
clientTelemetry = new ClientTelemetry(this, null, UUID.randomUUID().toString(),
ManagementFactory.getRuntimeMXBean().getName(), userAgentContainer.getUserAgent(),
connectionPolicy.getConnectionMode(), globalEndpointManager.getLatestDatabaseAccount().getId(),
null, null, this.reactorHttpClient, connectionPolicy.isClientTelemetryEnabled(), this, this.connectionPolicy.getPreferredRegions());
Expand Down Expand Up @@ -4058,6 +4059,7 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) {
public void close() {
logger.info("Attempting to close client {}", this.clientId);
if (!closed.getAndSet(true)) {
activeClientsCnt.decrementAndGet();
logger.info("Shutting down ...");
logger.info("Closing Global Endpoint Manager ...");
LifeCycleUtils.closeQuietly(this.globalEndpointManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public String getVmSize() {
return compute != null ? compute.getVmSize() : StringUtils.EMPTY;
}

public String getVmId() {
return compute != null ? compute.getVmId() : StringUtils.EMPTY;
}

public Compute getCompute() {
return compute;
}
Expand All @@ -44,6 +48,7 @@ public static class Compute {
private String azEnvironment;
private String osType;
private String vmSize;
private String vmId;

public String getSku() {
return sku;
Expand Down Expand Up @@ -84,5 +89,13 @@ public String getVmSize() {
public void setVmSize(String vmSize) {
this.vmSize = vmSize;
}

public String getVmId() {
return vmId;
}

public void setVmId(String vmId) {
this.vmId = vmId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.CosmosDaemonThreadFactory;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.RequestVerb;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class ClientTelemetry {
public final static int ONE_KB_TO_BYTES = 1024;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class ClientTelemetry {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final static AtomicLong instanceCount = new AtomicLong(0);
private final static AtomicReference<AzureVMMetadata> azureVmMetaDataSingleton =
new AtomicReference<>(null);
private ClientTelemetryInfo clientTelemetryInfo;
private final HttpClient httpClient;
private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
Expand All @@ -98,7 +102,8 @@ public class ClientTelemetry {
private final IAuthorizationTokenProvider tokenProvider;
private final String globalDatabaseAccountName;

public ClientTelemetry(Boolean acceleratedNetworking,
public ClientTelemetry(DiagnosticsClientContext diagnosticsClientContext,
Boolean acceleratedNetworking,
String clientId,
String processId,
String userAgent,
Expand All @@ -111,8 +116,17 @@ public ClientTelemetry(Boolean acceleratedNetworking,
IAuthorizationTokenProvider tokenProvider,
List<String> preferredRegions
) {
clientTelemetryInfo = new ClientTelemetryInfo(clientId, processId, userAgent, connectionMode,
globalDatabaseAccountName, applicationRegion, hostEnvInfo, acceleratedNetworking, preferredRegions);
clientTelemetryInfo = new ClientTelemetryInfo(
getMachineId(diagnosticsClientContext),
clientId,
processId,
userAgent,
connectionMode,
globalDatabaseAccountName,
applicationRegion,
hostEnvInfo,
acceleratedNetworking,
preferredRegions);
this.isClosed = false;
this.httpClient = httpClient;
this.isClientTelemetryEnabled = isClientTelemetryEnabled;
Expand All @@ -125,6 +139,24 @@ public ClientTelemetryInfo getClientTelemetryInfo() {
return clientTelemetryInfo;
}

public static String getMachineId(DiagnosticsClientContext diagnosticsClientContext) {
AzureVMMetadata metadataSnapshot = azureVmMetaDataSingleton.get();

if (metadataSnapshot != null && metadataSnapshot.getVmId() != null) {
String machineId = "vmId:" + metadataSnapshot.getVmId();
if (diagnosticsClientContext != null) {
diagnosticsClientContext.getConfig().withMachineId(machineId);
}
return machineId;
}

if (diagnosticsClientContext == null) {
return "";
}

return diagnosticsClientContext.getConfig().getMachineId();
}

public static void recordValue(ConcurrentDoubleHistogram doubleHistogram, long value) {
try {
doubleHistogram.recordValue(value);
Expand Down Expand Up @@ -239,7 +271,21 @@ private Mono<Void> sendClientTelemetry() {
}).subscribeOn(scheduler);
}

private void populateAzureVmMetaData(AzureVMMetadata azureVMMetadata) {
this.clientTelemetryInfo.setApplicationRegion(azureVMMetadata.getLocation());
this.clientTelemetryInfo.setMachineId("vmId:" + azureVMMetadata.getVmId());
this.clientTelemetryInfo.setHostEnvInfo(azureVMMetadata.getOsType() + "|" + azureVMMetadata.getSku() +
"|" + azureVMMetadata.getVmSize() + "|" + azureVMMetadata.getAzEnvironment());
}

private void loadAzureVmMetaData() {
AzureVMMetadata metadataSnapshot = azureVmMetaDataSingleton.get();

if (metadataSnapshot != null) {
this.populateAzureVmMetaData(metadataSnapshot);
return;
}

URI targetEndpoint = null;
try {
targetEndpoint = new URI(AZURE_VM_METADATA);
Expand All @@ -253,16 +299,16 @@ private void loadAzureVmMetaData() {
HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(),
httpHeaders);
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest);
httpResponseMono.flatMap(response -> response.bodyAsString()).map(metadataJson -> parse(metadataJson,
AzureVMMetadata.class)).doOnSuccess(azureVMMetadata -> {
this.clientTelemetryInfo.setApplicationRegion(azureVMMetadata.getLocation());
this.clientTelemetryInfo.setHostEnvInfo(azureVMMetadata.getOsType() + "|" + azureVMMetadata.getSku() +
"|" + azureVMMetadata.getVmSize() + "|" + azureVMMetadata.getAzEnvironment());
}).onErrorResume(throwable -> {
logger.info("Client is not on azure vm");
logger.debug("Unable to get azure vm metadata", throwable);
return Mono.empty();
}).subscribe();
httpResponseMono
.flatMap(response -> response.bodyAsString()).map(metadataJson -> parse(metadataJson,
AzureVMMetadata.class)).doOnSuccess(metadata -> {
azureVmMetaDataSingleton.compareAndSet(null, metadata);
this.populateAzureVmMetaData(metadata);
}).onErrorResume(throwable -> {
logger.info("Client is not on azure vm");
logger.debug("Unable to get azure vm metadata", throwable);
return Mono.empty();
}).subscribe();
}

private static <T> T parse(String itemResponseBodyAsString, Class<T> itemClassType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@JsonSerialize(using = ClientTelemetrySerializer.class)
public class ClientTelemetryInfo {
private String timeStamp;
private String machineId;
private String clientId;
private String processId;
private String userAgent;
Expand All @@ -28,7 +29,8 @@ public class ClientTelemetryInfo {
private Map<ReportPayload, ConcurrentDoubleHistogram> cacheRefreshInfoMap;
private Map<ReportPayload, ConcurrentDoubleHistogram> operationInfoMap;

public ClientTelemetryInfo(String clientId,
public ClientTelemetryInfo(String machineId,
String clientId,
String processId,
String userAgent,
ConnectionMode connectionMode,
Expand All @@ -37,6 +39,7 @@ public ClientTelemetryInfo(String clientId,
String hostEnvInfo,
Boolean acceleratedNetworking,
List<String> preferredRegions) {
this.machineId = machineId;
this.clientId = clientId;
this.processId = processId;
this.userAgent = userAgent;
Expand Down Expand Up @@ -108,6 +111,14 @@ public void setApplicationRegion(String applicationRegion) {
this.applicationRegion = applicationRegion;
}

public String getMachineId() {
return machineId;
}

public void setMachineId(String machineId) {
this.machineId = machineId;
}

public String getHostEnvInfo() {
return hostEnvInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class ClientTelemetrySerializer extends StdSerializer<ClientTelemetryInfo
public void serialize(ClientTelemetryInfo telemetry, JsonGenerator generator, SerializerProvider serializerProvider) throws IOException {
generator.writeStartObject();
generator.writeStringField("timeStamp", telemetry.getTimeStamp());
generator.writeStringField("machineId", telemetry.getMachineId());
generator.writeStringField("clientId", telemetry.getClientId());

if (telemetry.getProcessId() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class StoreClientFactory implements AutoCloseable {
private final Configs configs;
private final TransportClient transportClient;
private volatile boolean isClosed;
private final ClientTelemetry clientTelemetry;

public StoreClientFactory(
IAddressResolver addressResolver,
Expand All @@ -34,7 +33,6 @@ public StoreClientFactory(
ClientTelemetry clientTelemetry) {

this.configs = configs;
this.clientTelemetry = clientTelemetry;
Protocol protocol = configs.getProtocol();
if (enableTransportClientSharing) {
this.transportClient = SharedTransportClient.getOrCreateInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ReportPayload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public void clientTelemetryWithStageJunoEndpoint() throws InterruptedException,
String databaseId = UUID.randomUUID().toString();
try {
String whiteListedAccountForTelemetry = System.getProperty("COSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT");
assertThat(whiteListedAccountForTelemetry).isNotNull();
String[] credentialList = whiteListedAccountForTelemetry.split(";");
String host = credentialList[0].substring("AccountEndpoint=".length());
String key = credentialList[1].substring("AccountKey=".length());
Expand Down Expand Up @@ -295,7 +296,9 @@ public void clientTelemetryWithStageJunoEndpoint() throws InterruptedException,
return httpResponse.statusCode() == HttpConstants.StatusCodes.OK;
}).verifyComplete();
} finally {
cosmosClient.getDatabase(databaseId).delete();
if (cosmosClient != null) {
cosmosClient.getDatabase(databaseId).delete();
}
safeCloseSyncClient(cosmosClient);
}
}
Expand Down
Loading

0 comments on commit 427d396

Please sign in to comment.