Skip to content

Commit

Permalink
Implement Armeria instrumentation context propagation. (#979)
Browse files Browse the repository at this point in the history
* Implement Armeria instrumentation context propagation.

* Doc and cleanup

* Update

* Clean
  • Loading branch information
Anuraag Agrawal authored Aug 21, 2020
1 parent 753d996 commit 56360a8
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,19 @@ class ArmeriaNoDuplicateInstrumentationTest extends AbstractArmeriaTest implemen
}

def childSetupSpec() {
server.before()
backend.before()
frontend.before()
}

def childCleanupSpec() {
server.after()
backend.after()
frontend.after()
}

// Because our unit tests include io.grpc.Context in the bootstrap classloader, there does not
// seem to be a simple way of testing this yet.
@Override
boolean supportsContext() {
return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ class ArmeriaTest extends AbstractArmeriaTest implements AgentTestTrait {
}

def childSetupSpec() {
server.before()
backend.before()
frontend.before()
}

def childCleanupSpec() {
server.after()
backend.after()
frontend.after()
}

// Because our unit tests include io.grpc.Context in the bootstrap classloader, there does not
// seem to be a simple way of testing this yet.
@Override
boolean supportsContext() {
return false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.override;

import io.grpc.Context;
import io.grpc.Context.Storage;
import io.opentelemetry.instrumentation.armeria.v1_0.common.ArmeriaContextStorageOverride;

/**
* Initialization point for overriding {@link Storage} with {@link ArmeriaContextStorageOverride}.
*/
public class ContextStorageOverride extends Storage {

private static final ArmeriaContextStorageOverride DELEGATE = new ArmeriaContextStorageOverride();

@Override
public Context doAttach(Context toAttach) {
return DELEGATE.doAttach(toAttach);
}

@Override
public void detach(Context toDetach, Context toRestore) {
DELEGATE.detach(toDetach, toRestore);
}

@Override
public Context current() {
return DELEGATE.current();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.instrumentation.armeria.v1_0.common;

import com.linecorp.armeria.common.RequestContext;
import io.grpc.Context;
import io.grpc.Context.Storage;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link Storage} override of the gRPC context that uses an Armeria {@link RequestContext} as
* backing storage when available. Armeria provides idioms for propagating {@link RequestContext}
* which are inherently tied to request processing, so by using it for our storage as well, we can
* make sure a {@link io.opentelemetry.trace.Span} is propagated throughout the lifecycle of the
* request with it.
*
* <p>Contexts created when an Armeria context is not available follow the same pattern as the gRPC
* default of using a normal {@link ThreadLocal}.
*
* <p>This class is automatically picked up by gRPC by {@link
* io.grpc.override.ContextStorageOverride} provided in this artifact. If you use your own
* implementation to customize storage, you should generally delegate to this class to ensure
* context propagates with the Armeria context.
*/
public class ArmeriaContextStorageOverride extends Storage {

private static final Logger logger = LoggerFactory.getLogger(ArmeriaContextStorageOverride.class);

private static final ThreadLocal<Context> LOCAL_CONTEXT = new ThreadLocal<>();

private static final AttributeKey<Context> CONTEXT =
AttributeKey.valueOf(ArmeriaContextStorageOverride.class, "CONTEXT");

@Override
public Context doAttach(Context toAttach) {
RequestContext armeriaCtx = RequestContext.currentOrNull();
Context current = current(armeriaCtx);
if (armeriaCtx != null) {
armeriaCtx.setAttr(CONTEXT, toAttach);
} else {
LOCAL_CONTEXT.set(toAttach);
}
return current;
}

@Override
public void detach(Context toDetach, Context toRestore) {
RequestContext armeriaCtx = RequestContext.currentOrNull();
Context current = current(armeriaCtx);
if (current != toDetach) {
// Log a warning instead of throwing an exception as the context to attach is assumed
// to be the correct one and the unbalanced state represents a coding mistake in a lower
// layer in the stack that cannot be recovered from here.
if (logger.isWarnEnabled()) {
logger.warn("Context was not attached when detaching", new Throwable().fillInStackTrace());
}
}

if (toRestore == Context.ROOT) {
toRestore = null;
}
if (armeriaCtx != null) {
// We do not ever restore the ROOT context when in the context of an Armeria request. The
// context's lifecycle is effectively bound to the request, even through asynchronous flows,
// so we do not ever need to clear it explicitly. It will disappear along with the request
// when it's done.
if (toRestore != null) {
armeriaCtx.setAttr(CONTEXT, toRestore);
}
} else {
LOCAL_CONTEXT.set(toRestore);
}
}

@Override
public Context current() {
RequestContext armeriaCtx = RequestContext.currentOrNull();
return current(armeriaCtx);
}

private static Context current(RequestContext armeriaCtx) {
final Context current;
if (armeriaCtx != null) {
current = armeriaCtx.attr(CONTEXT);
} else {
current = LOCAL_CONTEXT.get();
}
return current != null ? current : Context.ROOT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ class ArmeriaTest extends AbstractArmeriaTest implements InstrumentationTestTrai
}

def childSetupSpec() {
server.before()
backend.before()
frontend.before()
}

def cleanupSpec() {
server.after()
backend.after()
frontend.after()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.opentelemetry.instrumentation.armeria.v1_0

import static io.opentelemetry.trace.Span.Kind.CLIENT
import static io.opentelemetry.trace.Span.Kind.SERVER
import static org.junit.Assume.assumeTrue

import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.client.WebClientBuilder
Expand All @@ -29,6 +30,7 @@ import com.linecorp.armeria.server.ServerBuilder
import com.linecorp.armeria.testing.junit4.server.ServerRule
import io.opentelemetry.auto.test.InstrumentationSpecification
import io.opentelemetry.trace.attributes.SemanticAttributes
import java.util.concurrent.CompletableFuture
import spock.lang.Shared
import spock.lang.Unroll

Expand All @@ -42,18 +44,41 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification {
// We cannot annotate with @ClassRule since then Armeria will be class loaded before bytecode
// instrumentation is set up by the Spock trait.
@Shared
protected ServerRule server = new ServerRule() {
protected ServerRule backend = new ServerRule() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb = configureServer(sb)
sb.service("/hello", { ctx, req -> HttpResponse.of(HttpStatus.OK) })
}
}

// We cannot annotate with @ClassRule since then Armeria will be class loaded before bytecode
// instrumentation is set up by the Spock trait.
@Shared
protected ServerRule frontend = new ServerRule() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb = configureServer(sb)
def backendClient = configureClient(WebClient.builder(backend.httpUri())).build()

sb.service("/exact", { ctx, req -> HttpResponse.of(HttpStatus.OK) })
sb.service("/items/{itemsId}", { ctx, req -> HttpResponse.of(HttpStatus.OK) })
sb.service("/httperror", { ctx, req -> HttpResponse.of(HttpStatus.NOT_IMPLEMENTED) })
sb.service("/exception", { ctx, req -> throw new IllegalStateException("illegal") })
sb.service("/async", { ctx, req ->
def executor = ctx.eventLoop()
CompletableFuture<HttpResponse> resp = backendClient.get("/hello").aggregate(executor)
.thenComposeAsync({ unused ->
backendClient.get("/hello").aggregate()
}, executor)
.thenApplyAsync({ unused -> HttpResponse.of(HttpStatus.OK)}, executor)

return HttpResponse.from(resp)
})
}
}

def client = configureClient(WebClient.builder(server.httpUri())).build()
def client = configureClient(WebClient.builder(frontend.httpUri())).build()

def "HTTP #method #path"() {
when:
Expand All @@ -73,7 +98,7 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification {
// TODO(anuraaga): peer name shouldn't be set to IP
"${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${server.httpUri()}${path}"
"${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}${path}"
"${SemanticAttributes.HTTP_METHOD.key()}" method.name()
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" code
}
Expand All @@ -89,7 +114,7 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification {
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${server.httpUri()}${path}"
"${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}${path}"
"${SemanticAttributes.HTTP_METHOD.key()}" method.name()
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" code
"${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c"
Expand All @@ -108,4 +133,108 @@ abstract class AbstractArmeriaTest extends InstrumentationSpecification {
"/httperror" | "/httperror" | HttpMethod.GET | 501
"/exception" | "/exception" | HttpMethod.GET | 500
}

def "context propagated by executor"() {
when:
assumeTrue(supportsContext())
def response = client.get("/async").aggregate().join()

then:
response.status().code() == 200
assertTraces(1) {
trace(0, 6) {
span(0) {
operationName("HTTP GET")
spanKind CLIENT
parent()
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
// TODO(anuraaga): peer name shouldn't be set to IP
"${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}/async"
"${SemanticAttributes.HTTP_METHOD.key()}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200
}
}
span(1) {
operationName("/async")
spanKind SERVER
childOf span(0)
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${frontend.httpUri()}/async"
"${SemanticAttributes.HTTP_METHOD.key()}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200
"${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c"
"${SemanticAttributes.HTTP_USER_AGENT.key()}" String
"${SemanticAttributes.HTTP_CLIENT_IP.key()}" "127.0.0.1"
}
}
span(2) {
operationName("HTTP GET")
spanKind CLIENT
childOf span(1)
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
// TODO(anuraaga): peer name shouldn't be set to IP
"${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello"
"${SemanticAttributes.HTTP_METHOD.key()}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200
}
}
span(3) {
operationName("/hello")
spanKind SERVER
childOf span(2)
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello"
"${SemanticAttributes.HTTP_METHOD.key()}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200
"${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c"
"${SemanticAttributes.HTTP_USER_AGENT.key()}" String
"${SemanticAttributes.HTTP_CLIENT_IP.key()}" "127.0.0.1"
}
}
span(4) {
operationName("HTTP GET")
spanKind CLIENT
childOf span(1)
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
// TODO(anuraaga): peer name shouldn't be set to IP
"${SemanticAttributes.NET_PEER_NAME.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello"
"${SemanticAttributes.HTTP_METHOD.key()}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200
}
}
span(5) {
operationName("/hello")
spanKind SERVER
childOf span(4)
attributes {
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
"${SemanticAttributes.HTTP_URL.key()}" "${backend.httpUri()}/hello"
"${SemanticAttributes.HTTP_METHOD.key()}" "GET"
"${SemanticAttributes.HTTP_STATUS_CODE.key()}" 200
"${SemanticAttributes.HTTP_FLAVOR.key()}" "h2c"
"${SemanticAttributes.HTTP_USER_AGENT.key()}" String
"${SemanticAttributes.HTTP_CLIENT_IP.key()}" "127.0.0.1"
}
}
}
}
}

boolean supportsContext() {
return true
}
}

0 comments on commit 56360a8

Please sign in to comment.