From 56360a8137d84ce0a3ab8e6dae2ecb6285d016a7 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 21 Aug 2020 11:22:14 +0900 Subject: [PATCH] Implement Armeria instrumentation context propagation. (#979) * Implement Armeria instrumentation context propagation. * Doc and cleanup * Update * Clean --- ...meriaNoDuplicateInstrumentationTest.groovy | 13 +- .../auto/armeria/v1_0/ArmeriaTest.groovy | 13 +- .../grpc/override/ContextStorageOverride.java | 44 ++++++ .../common/ArmeriaContextStorageOverride.java | 106 ++++++++++++++ .../armeria/v1_0/ArmeriaTest.groovy | 6 +- .../armeria/v1_0/AbstractArmeriaTest.groovy | 137 +++++++++++++++++- 6 files changed, 309 insertions(+), 10 deletions(-) create mode 100644 instrumentation/armeria-1.0/library/src/main/java/io/grpc/override/ContextStorageOverride.java create mode 100644 instrumentation/armeria-1.0/library/src/main/java/io/opentelemetry/instrumentation/armeria/v1_0/common/ArmeriaContextStorageOverride.java diff --git a/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaNoDuplicateInstrumentationTest.groovy b/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaNoDuplicateInstrumentationTest.groovy index de7bb0ba8046..718b15d4fe8c 100644 --- a/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaNoDuplicateInstrumentationTest.groovy +++ b/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaNoDuplicateInstrumentationTest.groovy @@ -35,10 +35,19 @@ class ArmeriaNoDuplicateInstrumentationTest extends AbstractArmeriaTest implemen } def childSetupSpec() { - server.before() + backend.before() + frontend.before() } def childCleanupSpec() { - server.after() + backend.after() + frontend.after() + } + + // Because our unit tests include io.grpc.Context in the bootstrap classloader, there does not + // seem to be a simple way of testing this yet. + @Override + boolean supportsContext() { + return false } } diff --git a/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaTest.groovy b/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaTest.groovy index de65b65591f7..9c50a61c6e89 100644 --- a/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaTest.groovy +++ b/instrumentation/armeria-1.0/auto/src/test/groovy/io/opentelemetry/instrumentation/auto/armeria/v1_0/ArmeriaTest.groovy @@ -33,10 +33,19 @@ class ArmeriaTest extends AbstractArmeriaTest implements AgentTestTrait { } def childSetupSpec() { - server.before() + backend.before() + frontend.before() } def childCleanupSpec() { - server.after() + backend.after() + frontend.after() + } + + // Because our unit tests include io.grpc.Context in the bootstrap classloader, there does not + // seem to be a simple way of testing this yet. + @Override + boolean supportsContext() { + return false } } diff --git a/instrumentation/armeria-1.0/library/src/main/java/io/grpc/override/ContextStorageOverride.java b/instrumentation/armeria-1.0/library/src/main/java/io/grpc/override/ContextStorageOverride.java new file mode 100644 index 000000000000..fb587c75b17c --- /dev/null +++ b/instrumentation/armeria-1.0/library/src/main/java/io/grpc/override/ContextStorageOverride.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.override; + +import io.grpc.Context; +import io.grpc.Context.Storage; +import io.opentelemetry.instrumentation.armeria.v1_0.common.ArmeriaContextStorageOverride; + +/** + * Initialization point for overriding {@link Storage} with {@link ArmeriaContextStorageOverride}. + */ +public class ContextStorageOverride extends Storage { + + private static final ArmeriaContextStorageOverride DELEGATE = new ArmeriaContextStorageOverride(); + + @Override + public Context doAttach(Context toAttach) { + return DELEGATE.doAttach(toAttach); + } + + @Override + public void detach(Context toDetach, Context toRestore) { + DELEGATE.detach(toDetach, toRestore); + } + + @Override + public Context current() { + return DELEGATE.current(); + } +} diff --git a/instrumentation/armeria-1.0/library/src/main/java/io/opentelemetry/instrumentation/armeria/v1_0/common/ArmeriaContextStorageOverride.java b/instrumentation/armeria-1.0/library/src/main/java/io/opentelemetry/instrumentation/armeria/v1_0/common/ArmeriaContextStorageOverride.java new file mode 100644 index 000000000000..f91fa9174194 --- /dev/null +++ b/instrumentation/armeria-1.0/library/src/main/java/io/opentelemetry/instrumentation/armeria/v1_0/common/ArmeriaContextStorageOverride.java @@ -0,0 +1,106 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.instrumentation.armeria.v1_0.common; + +import com.linecorp.armeria.common.RequestContext; +import io.grpc.Context; +import io.grpc.Context.Storage; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link Storage} override of the gRPC context that uses an Armeria {@link RequestContext} as + * backing storage when available. Armeria provides idioms for propagating {@link RequestContext} + * which are inherently tied to request processing, so by using it for our storage as well, we can + * make sure a {@link io.opentelemetry.trace.Span} is propagated throughout the lifecycle of the + * request with it. + * + *

Contexts created when an Armeria context is not available follow the same pattern as the gRPC + * default of using a normal {@link ThreadLocal}. + * + *

This class is automatically picked up by gRPC by {@link + * io.grpc.override.ContextStorageOverride} provided in this artifact. If you use your own + * implementation to customize storage, you should generally delegate to this class to ensure + * context propagates with the Armeria context. + */ +public class ArmeriaContextStorageOverride extends Storage { + + private static final Logger logger = LoggerFactory.getLogger(ArmeriaContextStorageOverride.class); + + private static final ThreadLocal LOCAL_CONTEXT = new ThreadLocal<>(); + + private static final AttributeKey CONTEXT = + AttributeKey.valueOf(ArmeriaContextStorageOverride.class, "CONTEXT"); + + @Override + public Context doAttach(Context toAttach) { + RequestContext armeriaCtx = RequestContext.currentOrNull(); + Context current = current(armeriaCtx); + if (armeriaCtx != null) { + armeriaCtx.setAttr(CONTEXT, toAttach); + } else { + LOCAL_CONTEXT.set(toAttach); + } + return current; + } + + @Override + public void detach(Context toDetach, Context toRestore) { + RequestContext armeriaCtx = RequestContext.currentOrNull(); + Context current = current(armeriaCtx); + if (current != toDetach) { + // Log a warning instead of throwing an exception as the context to attach is assumed + // to be the correct one and the unbalanced state represents a coding mistake in a lower + // layer in the stack that cannot be recovered from here. + if (logger.isWarnEnabled()) { + logger.warn("Context was not attached when detaching", new Throwable().fillInStackTrace()); + } + } + + if (toRestore == Context.ROOT) { + toRestore = null; + } + if (armeriaCtx != null) { + // We do not ever restore the ROOT context when in the context of an Armeria request. The + // context's lifecycle is effectively bound to the request, even through asynchronous flows, + // so we do not ever need to clear it explicitly. It will disappear along with the request + // when it's done. + if (toRestore != null) { + armeriaCtx.setAttr(CONTEXT, toRestore); + } + } else { + LOCAL_CONTEXT.set(toRestore); + } + } + + @Override + public Context current() { + RequestContext armeriaCtx = RequestContext.currentOrNull(); + return current(armeriaCtx); + } + + private static Context current(RequestContext armeriaCtx) { + final Context current; + if (armeriaCtx != null) { + current = armeriaCtx.attr(CONTEXT); + } else { + current = LOCAL_CONTEXT.get(); + } + return current != null ? current : Context.ROOT; + } +} diff --git a/instrumentation/armeria-1.0/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_0/ArmeriaTest.groovy b/instrumentation/armeria-1.0/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_0/ArmeriaTest.groovy index 722e742b52c3..7890424ace1f 100644 --- a/instrumentation/armeria-1.0/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_0/ArmeriaTest.groovy +++ b/instrumentation/armeria-1.0/library/src/test/groovy/io/opentelemetry/instrumentation/armeria/v1_0/ArmeriaTest.groovy @@ -34,10 +34,12 @@ class ArmeriaTest extends AbstractArmeriaTest implements InstrumentationTestTrai } def childSetupSpec() { - server.before() + backend.before() + frontend.before() } def cleanupSpec() { - server.after() + backend.after() + frontend.after() } } diff --git a/instrumentation/armeria-1.0/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_0/AbstractArmeriaTest.groovy b/instrumentation/armeria-1.0/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_0/AbstractArmeriaTest.groovy index 6e646d6fcdc2..3d6a64ee66e2 100644 --- a/instrumentation/armeria-1.0/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_0/AbstractArmeriaTest.groovy +++ b/instrumentation/armeria-1.0/testing/src/main/groovy/io/opentelemetry/instrumentation/armeria/v1_0/AbstractArmeriaTest.groovy @@ -18,6 +18,7 @@ package io.opentelemetry.instrumentation.armeria.v1_0 import static io.opentelemetry.trace.Span.Kind.CLIENT import static io.opentelemetry.trace.Span.Kind.SERVER +import static org.junit.Assume.assumeTrue import com.linecorp.armeria.client.WebClient import com.linecorp.armeria.client.WebClientBuilder @@ -29,6 +30,7 @@ import com.linecorp.armeria.server.ServerBuilder import com.linecorp.armeria.testing.junit4.server.ServerRule import io.opentelemetry.auto.test.InstrumentationSpecification import io.opentelemetry.trace.attributes.SemanticAttributes +import java.util.concurrent.CompletableFuture import spock.lang.Shared import spock.lang.Unroll @@ -42,18 +44,41 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification { // We cannot annotate with @ClassRule since then Armeria will be class loaded before bytecode // instrumentation is set up by the Spock trait. @Shared - protected ServerRule server = new ServerRule() { + protected ServerRule backend = new ServerRule() { @Override protected void configure(ServerBuilder sb) throws Exception { sb = configureServer(sb) + sb.service("/hello", { ctx, req -> HttpResponse.of(HttpStatus.OK) }) + } + } + + // We cannot annotate with @ClassRule since then Armeria will be class loaded before bytecode + // instrumentation is set up by the Spock trait. + @Shared + protected ServerRule frontend = new ServerRule() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb = configureServer(sb) + def backendClient = configureClient(WebClient.builder(backend.httpUri())).build() + sb.service("/exact", { ctx, req -> HttpResponse.of(HttpStatus.OK) }) sb.service("/items/{itemsId}", { ctx, req -> HttpResponse.of(HttpStatus.OK) }) sb.service("/httperror", { ctx, req -> HttpResponse.of(HttpStatus.NOT_IMPLEMENTED) }) sb.service("/exception", { ctx, req -> throw new IllegalStateException("illegal") }) + sb.service("/async", { ctx, req -> + def executor = ctx.eventLoop() + CompletableFuture resp = backendClient.get("/hello").aggregate(executor) + .thenComposeAsync({ unused -> + backendClient.get("/hello").aggregate() + }, executor) + .thenApplyAsync({ unused -> HttpResponse.of(HttpStatus.OK)}, executor) + + return HttpResponse.from(resp) + }) } } - def client = configureClient(WebClient.builder(server.httpUri())).build() + def client = configureClient(WebClient.builder(frontend.httpUri())).build() def "HTTP #method #path"() { when: @@ -73,7 +98,7 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification { // TODO(anuraaga): peer name shouldn't be set to IP "${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1" "${SemanticAttributes.NET_PEER_PORT.key()}" Long - "${SemanticAttributes.HTTP_URL.key()}" "${server.httpUri()}${path}" + "${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}${path}" "${SemanticAttributes.HTTP_METHOD.key()}" method.name() "${SemanticAttributes.HTTP_STATUS_CODE.key()}" code } @@ -89,7 +114,7 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification { attributes { "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" "${SemanticAttributes.NET_PEER_PORT.key()}" Long - "${SemanticAttributes.HTTP_URL.key()}" "${server.httpUri()}${path}" + "${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}${path}" "${SemanticAttributes.HTTP_METHOD.key()}" method.name() "${SemanticAttributes.HTTP_STATUS_CODE.key()}" code "${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c" @@ -108,4 +133,108 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification { "/httperror" | "/httperror" | HttpMethod.GET | 501 "/exception" | "/exception" | HttpMethod.GET | 500 } + + def "context propagated by executor"() { + when: + assumeTrue(supportsContext()) + def response = client.get("/async").aggregate().join() + + then: + response.status().code() == 200 + assertTraces(1) { + trace(0, 6) { + span(0) { + operationName("HTTP GET") + spanKind CLIENT + parent() + attributes { + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + // TODO(anuraaga): peer name shouldn't be set to IP + "${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + "${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}/async" + "${SemanticAttributes.HTTP_METHOD.key()}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200 + } + } + span(1) { + operationName("/async") + spanKind SERVER + childOf span(0) + attributes { + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + "${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}/async" + "${SemanticAttributes.HTTP_METHOD.key()}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200 + "${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c" + "${SemanticAttributes.HTTP_USER_AGENT.key()}" String + "${SemanticAttributes.HTTP_CLIENT_IP.key()}" "127.0.0.1" + } + } + span(2) { + operationName("HTTP GET") + spanKind CLIENT + childOf span(1) + attributes { + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + // TODO(anuraaga): peer name shouldn't be set to IP + "${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + "${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello" + "${SemanticAttributes.HTTP_METHOD.key()}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200 + } + } + span(3) { + operationName("/hello") + spanKind SERVER + childOf span(2) + attributes { + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + "${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello" + "${SemanticAttributes.HTTP_METHOD.key()}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200 + "${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c" + "${SemanticAttributes.HTTP_USER_AGENT.key()}" String + "${SemanticAttributes.HTTP_CLIENT_IP.key()}" "127.0.0.1" + } + } + span(4) { + operationName("HTTP GET") + spanKind CLIENT + childOf span(1) + attributes { + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + // TODO(anuraaga): peer name shouldn't be set to IP + "${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + "${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello" + "${SemanticAttributes.HTTP_METHOD.key()}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200 + } + } + span(5) { + operationName("/hello") + spanKind SERVER + childOf span(4) + attributes { + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + "${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello" + "${SemanticAttributes.HTTP_METHOD.key()}" "GET" + "${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200 + "${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c" + "${SemanticAttributes.HTTP_USER_AGENT.key()}" String + "${SemanticAttributes.HTTP_CLIENT_IP.key()}" "127.0.0.1" + } + } + } + } + } + + boolean supportsContext() { + return true + } }