From 4000478cd7af1eac323fc245f0a2b89a531617ff Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Tue, 22 Oct 2024 18:17:57 -0700 Subject: [PATCH 1/3] IT to verify actor tracing Signed-off-by: Artur Souza --- .../io/dapr/it/actors/ActorTracingIT.java | 59 +++++++++++++++++++ .../java/io/dapr/it/actors/app/MyActor.java | 3 +- .../io/dapr/it/actors/app/MyActorBase.java | 26 ++++---- 3 files changed, 75 insertions(+), 13 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java new file mode 100644 index 000000000..6f0b8c96a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java @@ -0,0 +1,59 @@ +package io.dapr.it.actors; + +import io.dapr.actors.ActorId; +import io.dapr.actors.client.ActorProxyBuilder; +import io.dapr.client.DaprClient; +import io.dapr.it.BaseIT; +import io.dapr.it.actors.app.MyActor; +import io.dapr.it.actors.app.MyActorService; +import io.dapr.it.tracing.Validation; +import io.dapr.it.tracing.http.OpenTelemetryConfig; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry; +import static io.dapr.it.tracing.OpenTelemetry.getReactorContext; + +public class ActorTracingIT extends BaseIT { + + @Test + public void testInvoke() throws Exception { + var run = startDaprApp( + ActorTracingIT.class.getSimpleName(), + MyActorService.SUCCESS_MESSAGE, + MyActorService.class, + true, + 60000); + var clientRun = startDaprApp( + ActorTracingIT.class.getSimpleName()+"Client", + 60000); + + OpenTelemetry openTelemetry = createOpenTelemetry(OpenTelemetryConfig.SERVICE_NAME); + Tracer tracer = openTelemetry.getTracer(OpenTelemetryConfig.TRACER_NAME); + String spanName = UUID.randomUUID().toString(); + Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan(); + + try (DaprClient client = run.newDaprClientBuilder().build()) { + client.waitForSidecar(10000).block(); + MyActor myActor = + new ActorProxyBuilder<>( + "MyActorTest", + MyActor.class, + clientRun.newActorClient()).build(new ActorId("123456")); + try (Scope scope = span.makeCurrent()) { + myActor.say("hello world").contextWrite(getReactorContext(openTelemetry)).block(); + } + } + + span.end(); + + Validation.validate(spanName, "calllocal/tracingithttp-service/say"); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java index 4d1d1965d..0c5f67888 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActor.java @@ -14,12 +14,13 @@ package io.dapr.it.actors.app; import io.dapr.actors.ActorMethod; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; public interface MyActor { - String say(String something); + Mono say(String something); List retrieveActiveActors(); diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorBase.java b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorBase.java index b05dc3b2b..6360009dc 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorBase.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorBase.java @@ -60,19 +60,21 @@ public MyActorBase(ActorRuntimeContext runtimeContext, ActorId id, TypeRef re private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Override - public String say(String something) { - String reversedString = ""; - try { - this.formatAndLog(true, "say"); - reversedString = new StringBuilder(something).reverse().toString(); - - this.formatAndLog(false, "say"); - } catch(Exception e) { - // We don't throw, but the proxy side will know it failed because it expects a reversed input - System.out.println("Caught " + e); - } + public Mono say(String something) { + return Mono.fromCallable(() -> { + String reversedString = ""; + try { + this.formatAndLog(true, "say"); + reversedString = new StringBuilder(something).reverse().toString(); + + this.formatAndLog(false, "say"); + } catch(Exception e) { + // We don't throw, but the proxy side will know it failed because it expects a reversed input + System.out.println("Caught " + e); + } - return reversedString; + return reversedString; + }); } @Override From f4f80be3c78b425a203b760eec7e51c95c26e7da Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Tue, 12 Nov 2024 16:02:49 -0800 Subject: [PATCH 2/3] Fix IT showing that actor tracing is working. Signed-off-by: Artur Souza --- .../src/test/java/io/dapr/it/DaprRun.java | 11 ++- .../io/dapr/it/actors/ActorTracingIT.java | 81 +++++++++++++++++-- .../io/dapr/it/tracing/OpenTelemetry.java | 3 +- .../java/io/dapr/it/tracing/Validation.java | 2 +- .../io/dapr/it/tracing/grpc/TracingIT.java | 5 +- .../it/tracing/http/OpenTelemetryConfig.java | 4 +- .../io/dapr/it/tracing/http/TracingIT.java | 4 +- 7 files changed, 90 insertions(+), 20 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 99c38bfff..c074e3593 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -106,7 +106,7 @@ private DaprRun(String testName, new Command( successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol), - daprApiToken == null ? null : Map.of("DAPR_API_TOKEN", daprApiToken)); + buildEnvMap(daprApiToken)); this.listCommand = new Command( this.appName, "dapr list"); @@ -340,6 +340,15 @@ private static String resolveDaprApiToken(Class serviceClass) { return DEFAULT_DAPR_API_TOKEN; } + private static final Map buildEnvMap(String daprApiToken) { + final Map envMap = new HashMap<>(); + envMap.put("DAPR_HOST_IP", "127.0.01"); + if (daprApiToken != null) { + envMap.put("DAPR_API_TOKEN", daprApiToken); + } + return Collections.unmodifiableMap(envMap); + } + private static void assertListeningOnPort(int port) { System.out.printf("Checking port %d ...\n", port); diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java index 6f0b8c96a..0edeb3c30 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java @@ -1,31 +1,39 @@ package io.dapr.it.actors; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxyBuilder; import io.dapr.client.DaprClient; import io.dapr.it.BaseIT; import io.dapr.it.actors.app.MyActor; import io.dapr.it.actors.app.MyActorService; -import io.dapr.it.tracing.Validation; import io.dapr.it.tracing.http.OpenTelemetryConfig; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; +import net.minidev.json.JSONArray; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import org.junit.jupiter.api.Test; import java.util.UUID; import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry; import static io.dapr.it.tracing.OpenTelemetry.getReactorContext; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ActorTracingIT extends BaseIT { @Test public void testInvoke() throws Exception { var run = startDaprApp( - ActorTracingIT.class.getSimpleName(), + ActorTracingIT.class.getSimpleName()+"Server", MyActorService.SUCCESS_MESSAGE, MyActorService.class, true, @@ -34,26 +42,83 @@ public void testInvoke() throws Exception { ActorTracingIT.class.getSimpleName()+"Client", 60000); - OpenTelemetry openTelemetry = createOpenTelemetry(OpenTelemetryConfig.SERVICE_NAME); + OpenTelemetry openTelemetry = createOpenTelemetry(); Tracer tracer = openTelemetry.getTracer(OpenTelemetryConfig.TRACER_NAME); String spanName = UUID.randomUUID().toString(); Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan(); try (DaprClient client = run.newDaprClientBuilder().build()) { - client.waitForSidecar(10000).block(); MyActor myActor = new ActorProxyBuilder<>( "MyActorTest", MyActor.class, clientRun.newActorClient()).build(new ActorId("123456")); try (Scope scope = span.makeCurrent()) { - myActor.say("hello world").contextWrite(getReactorContext(openTelemetry)).block(); + client.waitForSidecar(10000).block(); + myActor.say("hello world") + .contextWrite(getReactorContext(openTelemetry)) + .block(); } + } finally { + span.end(); } - span.end(); - - Validation.validate(spanName, "calllocal/tracingithttp-service/say"); + Validation.validateGrandChild( + spanName, + "/dapr.proto.runtime.v1.dapr/invokeactor", + "callactor/myactortest/say"); } + private static final class Validation { + + private static final OkHttpClient HTTP_CLIENT = new OkHttpClient(); + + /** + * JSON Path for main span Id. + */ + public static final String JSONPATH_MAIN_SPAN_ID = "$..[?(@.name == \"%s\")]['id']"; + + /** + * JSON Path for child span. + */ + public static final String JSONPATH_PARENT_SPAN_ID = + "$..[?(@.parentId=='%s' && @.name=='%s')]['id']"; + + public static void validateGrandChild(String grandParentSpanName, String parentSpanName, String grandChildSpanName) throws Exception { + // Must wait for some time to make sure Zipkin receives all spans. + Thread.sleep(10000); + HttpUrl.Builder urlBuilder = new HttpUrl.Builder(); + urlBuilder.scheme("http") + .host("localhost") + .port(9411) + .addPathSegments("api/v2/traces") + .addQueryParameter("limit", "100"); + Request.Builder requestBuilder = new Request.Builder() + .url(urlBuilder.build()); + requestBuilder.method("GET", null); + + Request request = requestBuilder.build(); + + Response response = HTTP_CLIENT.newCall(request).execute(); + DocumentContext documentContext = JsonPath.parse(response.body().string()); + String grandParentSpanId = readOne(documentContext, String.format(JSONPATH_MAIN_SPAN_ID, grandParentSpanName)).toString(); + assertNotNull(grandParentSpanId); + + String parentSpanId = readOne(documentContext, String.format(JSONPATH_PARENT_SPAN_ID, grandParentSpanId, parentSpanName)) + .toString(); + assertNotNull(parentSpanId); + + String grandChildSpanId = readOne(documentContext, String.format(JSONPATH_PARENT_SPAN_ID, parentSpanId, grandChildSpanName)) + .toString(); + assertNotNull(grandChildSpanId); + } + + private static Object readOne(DocumentContext documentContext, String path) { + JSONArray arr = documentContext.read(path); + assertTrue(arr.size() > 0); + + return arr.get(0); + } + + } } diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java b/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java index 08a6ea88f..98fad6f08 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java @@ -34,10 +34,9 @@ public class OpenTelemetry { /** * Creates an opentelemetry instance. - * @param serviceName Name of the service in Zipkin * @return OpenTelemetry. */ - public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) throws InterruptedException { + public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry() throws InterruptedException { waitForZipkin(); String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT); ZipkinSpanExporter zipkinExporter = ZipkinSpanExporter.builder().setEndpoint(httpUrl + ENDPOINT_V2_SPANS).build(); diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java b/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java index 609a2f521..18ac9969b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java @@ -44,7 +44,7 @@ public final class Validation { public static final String JSONPATH_SLEEP_SPAN_ID = "$..[?(@.parentId=='%s' && @.duration > 1000000 && @.name=='%s')]['id']"; - public static void validate(String spanName, String sleepSpanName) throws Exception { + public static void validateSleep(String spanName, String sleepSpanName) throws Exception { // Must wait for some time to make sure Zipkin receives all spans. Thread.sleep(10000); HttpUrl.Builder urlBuilder = new HttpUrl.Builder(); diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java index 256347398..a89c52574 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java @@ -1,7 +1,6 @@ package io.dapr.it.tracing.grpc; import io.dapr.client.DaprClient; -import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.HttpExtension; import io.dapr.it.AppRun; import io.dapr.it.BaseIT; @@ -42,7 +41,7 @@ public void setup() throws Exception { @Test public void testInvoke() throws Exception { - OpenTelemetry openTelemetry = createOpenTelemetry("service over grpc"); + OpenTelemetry openTelemetry = createOpenTelemetry(); Tracer tracer = openTelemetry.getTracer("grpc integration test tracer"); String spanName = UUID.randomUUID().toString(); Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan(); @@ -59,6 +58,6 @@ public void testInvoke() throws Exception { span.end(); - Validation.validate(spanName, "calllocal/tracingitgrpc-service/sleepovergrpc"); + Validation.validateSleep(spanName, "calllocal/tracingitgrpc-service/sleepovergrpc"); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java b/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java index 8a976f93d..d6257abd3 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java @@ -28,11 +28,9 @@ public class OpenTelemetryConfig { public static final String TRACER_NAME = "integration testing tracer"; - public static final String SERVICE_NAME = "integration testing service over http"; - @Bean public OpenTelemetry initOpenTelemetry() throws InterruptedException { - return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry(SERVICE_NAME); + return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry(); } @Bean diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java b/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java index 3285d2ca8..7e05ba90c 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java @@ -41,7 +41,7 @@ public void setup() throws Exception { @Test public void testInvoke() throws Exception { - OpenTelemetry openTelemetry = createOpenTelemetry(OpenTelemetryConfig.SERVICE_NAME); + OpenTelemetry openTelemetry = createOpenTelemetry(); Tracer tracer = openTelemetry.getTracer(OpenTelemetryConfig.TRACER_NAME); String spanName = UUID.randomUUID().toString(); Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan(); @@ -57,7 +57,7 @@ public void testInvoke() throws Exception { span.end(); - Validation.validate(spanName, "calllocal/tracingithttp-service/sleep"); + Validation.validateSleep(spanName, "calllocal/tracingithttp-service/sleep"); } } From bcdda6c92a4402c4c7c7514625127329ca5fa2b4 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Mon, 18 Nov 2024 09:56:49 -0800 Subject: [PATCH 3/3] Update DaprRun.java --- sdk-tests/src/test/java/io/dapr/it/DaprRun.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index c074e3593..839eaa861 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -342,7 +342,7 @@ private static String resolveDaprApiToken(Class serviceClass) { private static final Map buildEnvMap(String daprApiToken) { final Map envMap = new HashMap<>(); - envMap.put("DAPR_HOST_IP", "127.0.01"); + envMap.put("DAPR_HOST_IP", "127.0.0.1"); if (daprApiToken != null) { envMap.put("DAPR_API_TOKEN", daprApiToken); }