Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure resource providers #1228

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
add aks resource provider
zeitlinger committed Dec 12, 2024
commit 43ba1aef31d2f3ff09e1ef8976dea58bde724862
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.azure.resource;

import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.semconv.incubating.CloudIncubatingAttributes;
import io.opentelemetry.semconv.incubating.K8sIncubatingAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

public class AzureAksResourceProvider extends CloudResourceProvider {

private static final Map<String, AzureVmResourceProvider.Entry> COMPUTE_MAPPING = new HashMap<>();

static {
COMPUTE_MAPPING.put(
"resourceGroupName",
new AzureVmResourceProvider.Entry(
K8sIncubatingAttributes.K8S_CLUSTER_NAME, AzureAksResourceProvider::parseClusterName));
}

// visible for testing
static String parseClusterName(String resourceGroup) {
// Code inspired by
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/datadogexporter/internal/hostmetadata/internal/azure/provider.go#L36
String[] splitAll = resourceGroup.split("_");
if (splitAll.length == 4 && splitAll[0].equalsIgnoreCase("mc")) {
return splitAll[splitAll.length - 2];
}
return resourceGroup;
}

// Environment variable that is set when running on Kubernetes
static final String KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST";
private final Supplier<Optional<String>> client;
private final Map<String, String> environment;

// SPI
public AzureAksResourceProvider() {
this(AzureMetadataService.defaultClient(), System.getenv());
}

// visible for testing
public AzureAksResourceProvider(
zeitlinger marked this conversation as resolved.
Show resolved Hide resolved
Supplier<Optional<String>> client, Map<String, String> environment) {
this.client = client;
this.environment = environment;
}

@Override
public int order() {
// run after the fast cloud resource providers that only check environment variables
// and before the AKS provider
return 100;
}

@Override
public Resource createResource(ConfigProperties configProperties) {
if (environment.get(KUBERNETES_SERVICE_HOST) == null) {
return Resource.empty();
}
return client
.get()
.map(
body ->
AzureVmResourceProvider.parseMetadata(
body, COMPUTE_MAPPING, CloudIncubatingAttributes.CloudPlatformValues.AZURE_AKS))
.orElse(Resource.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.azure.resource;

import com.fasterxml.jackson.core.JsonFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class AzureMetadataService {
static final JsonFactory JSON_FACTORY = new JsonFactory();
private static final URL METADATA_URL;

static {
try {
METADATA_URL = new URL("http://169.254.169.254/metadata/instance?api-version=2021-02-01");
} catch (MalformedURLException e) {
throw new IllegalStateException(e);
}
}

private AzureMetadataService() {}

private static final Duration TIMEOUT = Duration.ofSeconds(1);

private static final Logger logger = Logger.getLogger(AzureMetadataService.class.getName());

static Supplier<Optional<String>> defaultClient() {
return () -> fetchMetadata(METADATA_URL);
}

// visible for testing
static Optional<String> fetchMetadata(URL url) {
OkHttpClient client =
new OkHttpClient.Builder()
.callTimeout(TIMEOUT)
.connectTimeout(TIMEOUT)
.readTimeout(TIMEOUT)
.build();

Request request = new Request.Builder().url(url).get().addHeader("Metadata", "true").build();

try (Response response = client.newCall(request).execute()) {
int responseCode = response.code();
if (responseCode != 200) {
logger.log(
Level.FINE,
"Error response from "
+ url
+ " code ("
+ responseCode
+ ") text "
+ response.message());
return Optional.empty();
}

return Optional.of(Objects.requireNonNull(response.body()).string());
} catch (IOException e) {
logger.log(Level.FINE, "Failed to fetch Azure VM metadata", e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
import static io.opentelemetry.semconv.incubating.OsIncubatingAttributes.OS_TYPE;
import static io.opentelemetry.semconv.incubating.OsIncubatingAttributes.OS_VERSION;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import io.opentelemetry.api.common.AttributeKey;
@@ -25,58 +24,54 @@
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.semconv.incubating.CloudIncubatingAttributes;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.jetbrains.annotations.NotNull;

public class AzureVmResourceProvider extends CloudResourceProvider {

private static final Map<String, AttributeKey<String>> COMPUTE_MAPPING = new HashMap<>();
static class Entry {
final AttributeKey<String> key;
final Function<String, String> transform;

static {
COMPUTE_MAPPING.put("location", CLOUD_REGION);
COMPUTE_MAPPING.put("resourceId", CLOUD_RESOURCE_ID);
COMPUTE_MAPPING.put("vmId", HOST_ID);
COMPUTE_MAPPING.put("name", HOST_NAME);
COMPUTE_MAPPING.put("vmSize", HOST_TYPE);
COMPUTE_MAPPING.put("osType", OS_TYPE);
COMPUTE_MAPPING.put("version", OS_VERSION);
COMPUTE_MAPPING.put("vmScaleSetName", AttributeKey.stringKey("azure.vm.scaleset.name"));
COMPUTE_MAPPING.put("sku", AttributeKey.stringKey("azure.vm.sku"));
}

private static final JsonFactory JSON_FACTORY = new JsonFactory();
Entry(AttributeKey<String> key) {
this(key, Function.identity());
}

private static final Duration TIMEOUT = Duration.ofSeconds(1);
Entry(AttributeKey<String> key, Function<String, String> transform) {
this.key = key;
this.transform = transform;
}
}

private static final Logger logger = Logger.getLogger(AzureVmResourceProvider.class.getName());
private static final URL METADATA_URL;
private static final Map<String, Entry> COMPUTE_MAPPING = new HashMap<>();

static {
try {
METADATA_URL = new URL("http://169.254.169.254/metadata/instance?api-version=2021-02-01");
} catch (MalformedURLException e) {
throw new IllegalStateException(e);
}
COMPUTE_MAPPING.put("location", new Entry(CLOUD_REGION));
COMPUTE_MAPPING.put("resourceId", new Entry(CLOUD_RESOURCE_ID));
COMPUTE_MAPPING.put("vmId", new Entry(HOST_ID));
COMPUTE_MAPPING.put("name", new Entry(HOST_NAME));
COMPUTE_MAPPING.put("vmSize", new Entry(HOST_TYPE));
COMPUTE_MAPPING.put("osType", new Entry(OS_TYPE));
COMPUTE_MAPPING.put("version", new Entry(OS_VERSION));
COMPUTE_MAPPING.put(
"vmScaleSetName", new Entry(AttributeKey.stringKey("azure.vm.scaleset.name")));
COMPUTE_MAPPING.put("sku", new Entry(AttributeKey.stringKey("azure.vm.sku")));
}

private static final Logger logger = Logger.getLogger(AzureVmResourceProvider.class.getName());

private final Supplier<Optional<String>> client;

// SPI
public AzureVmResourceProvider() {
this(() -> fetchMetadata(METADATA_URL));
this(AzureMetadataService.defaultClient());
}

// visible for testing
@@ -87,20 +82,26 @@ public AzureVmResourceProvider(Supplier<Optional<String>> client) {
@Override
public int order() {
// run after the fast cloud resource providers that only check environment variables
// and after the AKS provider
return 100;
}

@Override
public Resource createResource(ConfigProperties config) {
return client.get().map(AzureVmResourceProvider::parseMetadata).orElse(Resource.empty());
return client
.get()
.map(
body ->
parseMetadata(
body, COMPUTE_MAPPING, CloudIncubatingAttributes.CloudPlatformValues.AZURE_VM))
.orElse(Resource.empty());
}

private static Resource parseMetadata(String body) {
AttributesBuilder builder =
azureAttributeBuilder(CloudIncubatingAttributes.CloudPlatformValues.AZURE_VM);
try (JsonParser parser = JSON_FACTORY.createParser(body)) {
static Resource parseMetadata(String body, Map<String, Entry> computeMapping, String platform) {
AttributesBuilder builder = azureAttributeBuilder(platform);
try (JsonParser parser = AzureMetadataService.JSON_FACTORY.createParser(body)) {
parser.nextToken();
parseResponse(parser, builder);
parseResponse(parser, builder, computeMapping);
} catch (IOException e) {
logger.log(Level.FINE, "Can't get Azure VM metadata", e);
}
@@ -115,7 +116,9 @@ static AttributesBuilder azureAttributeBuilder(String platform) {
return builder;
}

static void parseResponse(JsonParser parser, AttributesBuilder builder) throws IOException {
static void parseResponse(
JsonParser parser, AttributesBuilder builder, Map<String, Entry> computeMapping)
throws IOException {
if (!parser.isExpectedStartObjectToken()) {
logger.log(Level.FINE, "Couldn't parse ECS metadata, invalid JSON");
return;
@@ -126,7 +129,7 @@ static void parseResponse(JsonParser parser, AttributesBuilder builder) throws I
(name, value) -> {
try {
if (name.equals("compute")) {
consumeCompute(parser, builder);
consumeCompute(parser, builder, computeMapping);
} else {
parser.skipChildren();
}
@@ -136,14 +139,15 @@ static void parseResponse(JsonParser parser, AttributesBuilder builder) throws I
});
}

private static void consumeCompute(JsonParser parser, AttributesBuilder builder)
private static void consumeCompute(
JsonParser parser, AttributesBuilder builder, Map<String, Entry> computeMapping)
throws IOException {
consumeJson(
parser,
(computeName, computeValue) -> {
AttributeKey<String> key = COMPUTE_MAPPING.get(computeName);
if (key != null) {
builder.put(key, computeValue);
Entry entry = computeMapping.get(computeName);
if (entry != null) {
builder.put(entry.key, entry.transform.apply(computeValue));
} else {
try {
parser.skipChildren();
@@ -160,36 +164,4 @@ private static void consumeJson(JsonParser parser, BiConsumer<String, String> co
consumer.accept(parser.currentName(), parser.nextTextValue());
}
}

// visible for testing
static Optional<String> fetchMetadata(URL url) {
OkHttpClient client =
new OkHttpClient.Builder()
.callTimeout(TIMEOUT)
.connectTimeout(TIMEOUT)
.readTimeout(TIMEOUT)
.build();

Request request = new Request.Builder().url(url).get().addHeader("Metadata", "true").build();

try (Response response = client.newCall(request).execute()) {
int responseCode = response.code();
if (responseCode != 200) {
logger.log(
Level.FINE,
"Error response from "
+ url
+ " code ("
+ responseCode
+ ") text "
+ response.message());
return Optional.empty();
}

return Optional.of(Objects.requireNonNull(response.body()).string());
} catch (IOException e) {
logger.log(Level.FINE, "Failed to fetch Azure VM metadata", e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.azure.resource;

import static io.opentelemetry.semconv.incubating.CloudIncubatingAttributes.CLOUD_PLATFORM;
import static io.opentelemetry.semconv.incubating.CloudIncubatingAttributes.CLOUD_PROVIDER;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.autoconfigure.spi.ResourceProvider;
import io.opentelemetry.sdk.testing.assertj.AttributesAssert;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.semconv.incubating.CloudIncubatingAttributes;
import io.opentelemetry.semconv.incubating.K8sIncubatingAttributes;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

class AzureAksResourceProviderTest extends MetadataBasedResourceProviderTest {

@NotNull
@Override
protected ResourceProvider getResourceProvider(Supplier<Optional<String>> client) {
return new AzureAksResourceProvider(
client,
Collections.singletonMap(AzureAksResourceProvider.KUBERNETES_SERVICE_HOST, "localhost"));
}

@Override
protected String getPlatform() {
return CloudIncubatingAttributes.CloudPlatformValues.AZURE_AKS;
}

@Override
protected void assertDefaultAttributes(AttributesAssert attributesAssert) {
attributesAssert
.containsEntry(CLOUD_PROVIDER, "azure")
.containsEntry(CLOUD_PLATFORM, CloudIncubatingAttributes.CloudPlatformValues.AZURE_AKS)
.containsEntry(K8sIncubatingAttributes.K8S_CLUSTER_NAME, "macikgo-test-may-23");
}

@Test
void notOnK8s() {
AzureAksResourceProvider provider =
new AzureAksResourceProvider(() -> Optional.of(okResponse()), Collections.emptyMap());
Attributes attributes = provider.createResource(null).getAttributes();
OpenTelemetryAssertions.assertThat(attributes).isEmpty();
}

@Test
void parseClusterName() {
String clusterName =
AzureAksResourceProvider.parseClusterName(
"mc_macikgo-test-may-23_macikgo-test-may-23_eastus");
assertThat(clusterName).isEqualTo("macikgo-test-may-23");
}
}
Original file line number Diff line number Diff line change
@@ -15,86 +15,27 @@
import static io.opentelemetry.semconv.incubating.OsIncubatingAttributes.OS_TYPE;
import static io.opentelemetry.semconv.incubating.OsIncubatingAttributes.OS_VERSION;

import com.google.common.base.Charsets;
import com.google.common.io.CharStreams;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.autoconfigure.spi.ResourceProvider;
import io.opentelemetry.sdk.testing.assertj.AttributesAssert;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.semconv.incubating.CloudIncubatingAttributes;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class AzureVmResourceProviderTest {

@RegisterExtension
public static final MockWebServerExtension server = new MockWebServerExtension();

@Test
void successFromFile() {
assertDefaultAttributes(createResource(() -> Optional.of(okResponse())));
}

@Test
void successFromMockServer() {
server.enqueue(HttpResponse.of(MediaType.JSON, okResponse()));
assertDefaultAttributes(mockServerResponse());
}

@Test
void responseNotFound() {
server.enqueue(HttpResponse.of(HttpStatus.NOT_FOUND));
mockServerResponse().isEmpty();
}

@Test
void responseEmpty() {
server.enqueue(HttpResponse.of(""));
assertOnlyProvider(mockServerResponse());
}

@Test
void responseEmptyJson() {
server.enqueue(HttpResponse.of("{}"));
assertOnlyProvider(mockServerResponse());
}

class AzureVmResourceProviderTest extends MetadataBasedResourceProviderTest {
@NotNull
private static AttributesAssert mockServerResponse() {
return createResource(
() -> {
try {
return AzureVmResourceProvider.fetchMetadata(server.httpUri().toURL());
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
});
@Override
protected ResourceProvider getResourceProvider(Supplier<Optional<String>> client) {
return new AzureVmResourceProvider(client);
}

@NotNull
private static AttributesAssert createResource(Supplier<Optional<String>> client) {
Resource resource = new AzureVmResourceProvider(client).createResource(null);
return OpenTelemetryAssertions.assertThat(resource.getAttributes());
@Override
protected String getPlatform() {
return CloudIncubatingAttributes.CloudPlatformValues.AZURE_VM;
}

private static void assertOnlyProvider(AttributesAssert attributesAssert) {
attributesAssert
.hasSize(2)
.containsEntry(CLOUD_PROVIDER, "azure")
.containsEntry(CLOUD_PLATFORM, CloudIncubatingAttributes.CloudPlatformValues.AZURE_VM);
}

private static void assertDefaultAttributes(AttributesAssert attributesAssert) {
@Override
protected void assertDefaultAttributes(AttributesAssert attributesAssert) {
attributesAssert
.containsEntry(CLOUD_PROVIDER, "azure")
.containsEntry(CLOUD_PLATFORM, CloudIncubatingAttributes.CloudPlatformValues.AZURE_VM)
@@ -110,18 +51,4 @@ private static void assertDefaultAttributes(AttributesAssert attributesAssert) {
.containsEntry("azure.vm.scaleset.name", "crpteste9vflji9")
.containsEntry("azure.vm.sku", "18.04-LTS");
}

private static String okResponse() {
try {
return CharStreams.toString(
new InputStreamReader(
Objects.requireNonNull(
AzureVmResourceProviderTest.class
.getClassLoader()
.getResourceAsStream("response.json")),
Charsets.UTF_8));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.azure.resource;

import static io.opentelemetry.semconv.incubating.CloudIncubatingAttributes.CLOUD_PLATFORM;
import static io.opentelemetry.semconv.incubating.CloudIncubatingAttributes.CLOUD_PROVIDER;

import com.google.common.base.Charsets;
import com.google.common.io.CharStreams;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.sdk.autoconfigure.spi.ResourceProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.assertj.AttributesAssert;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public abstract class MetadataBasedResourceProviderTest {
@RegisterExtension
public static final MockWebServerExtension server = new MockWebServerExtension();

@NotNull
private AttributesAssert mockServerResponse() {
return createResource(
() -> {
try {
return AzureMetadataService.fetchMetadata(server.httpUri().toURL());
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
});
}

@NotNull
private AttributesAssert createResource(Supplier<Optional<String>> client) {
Resource resource = getResourceProvider(client).createResource(null);
return OpenTelemetryAssertions.assertThat(resource.getAttributes());
}

@NotNull
protected abstract ResourceProvider getResourceProvider(Supplier<Optional<String>> client);

private void assertOnlyProvider(AttributesAssert attributesAssert) {
attributesAssert
.hasSize(2)
.containsEntry(CLOUD_PROVIDER, "azure")
.containsEntry(CLOUD_PLATFORM, getPlatform());
}

protected abstract String getPlatform();

protected abstract void assertDefaultAttributes(AttributesAssert attributesAssert);

protected static String okResponse() {
try {
return CharStreams.toString(
new InputStreamReader(
Objects.requireNonNull(
AzureVmResourceProviderTest.class
.getClassLoader()
.getResourceAsStream("response.json")),
Charsets.UTF_8));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

@Test
public void successFromFile() {
assertDefaultAttributes(createResource(() -> Optional.of(okResponse())));
}

@Test
public void successFromMockServer() {
server.enqueue(HttpResponse.of(MediaType.JSON, okResponse()));
assertDefaultAttributes(mockServerResponse());
}

@Test
public void responseNotFound() {
server.enqueue(HttpResponse.of(HttpStatus.NOT_FOUND));
mockServerResponse().isEmpty();
}

@Test
public void responseEmpty() {
server.enqueue(HttpResponse.of(""));
assertOnlyProvider(mockServerResponse());
}

@Test
public void responseEmptyJson() {
server.enqueue(HttpResponse.of("{}"));
assertOnlyProvider(mockServerResponse());
}
}