Skip to content

Commit

Permalink
Improve akka route handling with java dsl (#11926)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Aug 9, 2024
1 parent d15927a commit 42d7177
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 13 deletions.
20 changes: 20 additions & 0 deletions instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ dependencies {
latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+")
}

testing {
suites {
val javaRouteTest by registering(JvmTestSuite::class) {
dependencies {
if (findProperty("testLatestDeps") as Boolean) {
implementation("com.typesafe.akka:akka-http_2.13:+")
implementation("com.typesafe.akka:akka-stream_2.13:+")
} else {
implementation("com.typesafe.akka:akka-http_2.12:10.2.0")
implementation("com.typesafe.akka:akka-stream_2.12:2.6.21")
}
}
}
}
}

tasks {
withType<Test>().configureEach {
// required on jdk17
Expand All @@ -52,6 +68,10 @@ tasks {

systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

check {
dependsOn(testing.suites)
}
}

if (findProperty("testLatestDeps") as Boolean) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp;

import static akka.http.javadsl.server.PathMatchers.integerSegment;
import static org.assertj.core.api.Assertions.assertThat;

import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.testing.internal.armeria.client.WebClient;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
import java.util.concurrent.CompletionStage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class AkkaHttpServerJavaRouteTest extends AllDirectives {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

private final WebClient client = WebClient.of();

@Test
void testRoute() {
ActorSystem system = ActorSystem.create("my-system");
int port = PortUtils.findOpenPort();
Http http = Http.get(system);

Route route =
concat(
pathEndOrSingleSlash(() -> complete("root")),
pathPrefix(
"test",
() ->
concat(
pathSingleSlash(() -> complete("test")),
path(integerSegment(), (i) -> complete("ok")))));

CompletionStage<ServerBinding> binding = http.newServerAt("localhost", port).bind(route);
try {
AggregatedHttpRequest request =
AggregatedHttpRequest.of(HttpMethod.GET, "h1c://localhost:" + port + "/test/1");
AggregatedHttpResponse response = client.execute(request).aggregate().join();

assertThat(response.status().code()).isEqualTo(200);
assertThat(response.contentUtf8()).isEqualTo("ok");

testing.waitAndAssertTraces(
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("GET /test/*")));
} finally {
binding.thenCompose(ServerBinding::unbind).thenAccept(unbound -> system.terminate());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class AkkaRouteHolder implements ImplicitContextKeyed {
private static final ContextKey<AkkaRouteHolder> KEY = named("opentelemetry-akka-route");

private String route = "";
private boolean newSegment;
private boolean newSegment = true;
private boolean endMatched;
private final Deque<String> stack = new ArrayDeque<>();

Expand Down Expand Up @@ -70,6 +70,15 @@ public static void restore() {
}
}

// reset the state to when save was called
public static void reset() {
AkkaRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.route = holder.stack.peek();
holder.newSegment = true;
}
}

@Override
public Context storeInContext(Context context) {
return context.with(KEY, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@ public class RouteConcatenationInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
// scala 2.11
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1",
// scala 2.12 and later
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("apply", "$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
namedOneOf(
// scala 2.11
"apply",
// scala 2.12 and later
"$anonfun$$tilde$1"),
this.getClass().getName() + "$ApplyAdvice");

// This advice seems to be only needed when defining routes with java dsl. Since java dsl tests
// use scala 2.12 we are going to skip instrumenting this for scala 2.11.
transformer.applyAdviceToMethod(
namedOneOf("$anonfun$$tilde$2"), this.getClass().getName() + "$Apply2Advice");
}

@SuppressWarnings("unused")
Expand All @@ -43,4 +55,13 @@ public static void onExit() {
AkkaRouteHolder.restore();
}
}

@SuppressWarnings("unused")
public static class Apply2Advice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
AkkaRouteHolder.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{
IntNumber,
complete,
concat,
path,
pathEndOrSingleSlash,
pathPrefix,
pathSingleSlash
}
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
import io.opentelemetry.testing.internal.armeria.client.WebClient
import io.opentelemetry.testing.internal.armeria.common.{
AggregatedHttpRequest,
HttpMethod
}
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
import org.junit.jupiter.api.extension.RegisterExtension

import java.net.{URI, URISyntaxException}
import java.util.function.Consumer
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class AkkaHttpServerRouteTest {
@RegisterExtension private val testing: AgentInstrumentationExtension =
AgentInstrumentationExtension.create
private val client: WebClient = WebClient.of()

implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()

private def buildAddress(port: Int): URI = try
new URI("http://localhost:" + port + "/")
catch {
case exception: URISyntaxException =>
throw new IllegalStateException(exception)
}

@Test def testSimple(): Unit = {
val route = path("test") {
complete("ok")
}

test(route, "/test", "GET /test")
}

@Test def testRoute(): Unit = {
val route = concat(
pathEndOrSingleSlash {
complete("root")
},
pathPrefix("test") {
concat(
pathSingleSlash {
complete("test")
},
path(IntNumber) { _ =>
complete("ok")
}
)
}
)

test(route, "/test/1", "GET /test/*")
}

def test(route: Route, path: String, spanName: String): Unit = {
val port = PortUtils.findOpenPort
val address: URI = buildAddress(port)
val binding =
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)
try {
val request = AggregatedHttpRequest.of(
HttpMethod.GET,
address.resolve(path).toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(200)
assertThat(response.contentUtf8).isEqualTo("ok")

testing.waitAndAssertTraces(new Consumer[TraceAssert] {
override def accept(trace: TraceAssert): Unit =
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
override def accept(span: SpanDataAssert): Unit = {
span.hasName(spanName)
}
})
})
} finally {
binding.unbind()
}
}

@AfterAll
def cleanUp(): Unit = {
system.terminate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;

import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand All @@ -16,15 +16,13 @@
public class PathConcatenationInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.apache.pekko.http.scaladsl.server.PathMatcher$$anonfun$$tilde$1",
"org.apache.pekko.http.scaladsl.server.PathMatcher");
return named("org.apache.pekko.http.scaladsl.server.PathMatcher");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("apply", "$anonfun$append$1"), this.getClass().getName() + "$ApplyAdvice");
named("$anonfun$append$1"), this.getClass().getName() + "$ApplyAdvice");
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class PekkoRouteHolder implements ImplicitContextKeyed {
private static final ContextKey<PekkoRouteHolder> KEY = named("opentelemetry-pekko-route");

private String route = "";
private boolean newSegment;
private boolean newSegment = true;
private boolean endMatched;
private final Deque<String> stack = new ArrayDeque<>();

Expand Down Expand Up @@ -62,6 +62,14 @@ public static void save() {
}
}

public static void reset() {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.route = holder.stack.peek();
holder.newSegment = true;
}
}

public static void restore() {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;

import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand All @@ -16,15 +16,15 @@
public class RouteConcatenationInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.apache.pekko.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1",
"org.apache.pekko.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
return named("org.apache.pekko.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("apply", "$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
named("$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
transformer.applyAdviceToMethod(
named("$anonfun$$tilde$2"), this.getClass().getName() + "$Apply2Advice");
}

@SuppressWarnings("unused")
Expand All @@ -43,4 +43,13 @@ public static void onExit() {
PekkoRouteHolder.restore();
}
}

@SuppressWarnings("unused")
public static class Apply2Advice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
PekkoRouteHolder.reset();
}
}
}
Loading

0 comments on commit 42d7177

Please sign in to comment.