Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate tracing for actors #1154

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion sdk-tests/src/test/java/io/dapr/it/DaprRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private DaprRun(String testName,
new Command(
successMessage,
buildDaprCommand(this.appName, serviceClass, ports, appProtocol),
daprApiToken == null ? null : Map.of("DAPR_API_TOKEN", daprApiToken));
buildEnvMap(daprApiToken));
this.listCommand = new Command(
this.appName,
"dapr list");
Expand Down Expand Up @@ -340,6 +340,15 @@ private static String resolveDaprApiToken(Class serviceClass) {
return DEFAULT_DAPR_API_TOKEN;
}

private static final Map<String, String> buildEnvMap(String daprApiToken) {
final Map<String, String> envMap = new HashMap<>();
envMap.put("DAPR_HOST_IP", "127.0.0.1");
if (daprApiToken != null) {
envMap.put("DAPR_API_TOKEN", daprApiToken);
}
return Collections.unmodifiableMap(envMap);
}

private static void assertListeningOnPort(int port) {
System.out.printf("Checking port %d ...\n", port);

Expand Down
124 changes: 124 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/actors/ActorTracingIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package io.dapr.it.actors;

import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.client.DaprClient;
import io.dapr.it.BaseIT;
import io.dapr.it.actors.app.MyActor;
import io.dapr.it.actors.app.MyActorService;
import io.dapr.it.tracing.http.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import net.minidev.json.JSONArray;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.junit.jupiter.api.Test;

import java.util.UUID;

import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ActorTracingIT extends BaseIT {

@Test
public void testInvoke() throws Exception {
var run = startDaprApp(
ActorTracingIT.class.getSimpleName()+"Server",
MyActorService.SUCCESS_MESSAGE,
MyActorService.class,
true,
60000);
var clientRun = startDaprApp(
ActorTracingIT.class.getSimpleName()+"Client",
60000);

OpenTelemetry openTelemetry = createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(OpenTelemetryConfig.TRACER_NAME);
String spanName = UUID.randomUUID().toString();
Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan();

try (DaprClient client = run.newDaprClientBuilder().build()) {
MyActor myActor =
new ActorProxyBuilder<>(
"MyActorTest",
MyActor.class,
clientRun.newActorClient()).build(new ActorId("123456"));
try (Scope scope = span.makeCurrent()) {
client.waitForSidecar(10000).block();
myActor.say("hello world")
.contextWrite(getReactorContext(openTelemetry))
.block();
}
} finally {
span.end();
}
Comment on lines +50 to +64
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example showing how the client app should invoke actors and tracing headers to work.


Validation.validateGrandChild(
spanName,
"/dapr.proto.runtime.v1.dapr/invokeactor",
"callactor/myactortest/say");
}

private static final class Validation {

private static final OkHttpClient HTTP_CLIENT = new OkHttpClient();

/**
* JSON Path for main span Id.
*/
public static final String JSONPATH_MAIN_SPAN_ID = "$..[?(@.name == \"%s\")]['id']";

/**
* JSON Path for child span.
*/
public static final String JSONPATH_PARENT_SPAN_ID =
"$..[?(@.parentId=='%s' && @.name=='%s')]['id']";

public static void validateGrandChild(String grandParentSpanName, String parentSpanName, String grandChildSpanName) throws Exception {
// Must wait for some time to make sure Zipkin receives all spans.
Thread.sleep(10000);
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme("http")
.host("localhost")
.port(9411)
.addPathSegments("api/v2/traces")
.addQueryParameter("limit", "100");
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build());
requestBuilder.method("GET", null);

Request request = requestBuilder.build();

Response response = HTTP_CLIENT.newCall(request).execute();
DocumentContext documentContext = JsonPath.parse(response.body().string());
String grandParentSpanId = readOne(documentContext, String.format(JSONPATH_MAIN_SPAN_ID, grandParentSpanName)).toString();
assertNotNull(grandParentSpanId);

String parentSpanId = readOne(documentContext, String.format(JSONPATH_PARENT_SPAN_ID, grandParentSpanId, parentSpanName))
.toString();
assertNotNull(parentSpanId);

String grandChildSpanId = readOne(documentContext, String.format(JSONPATH_PARENT_SPAN_ID, parentSpanId, grandChildSpanName))
.toString();
assertNotNull(grandChildSpanId);
}

private static Object readOne(DocumentContext documentContext, String path) {
JSONArray arr = documentContext.read(path);
assertTrue(arr.size() > 0);

return arr.get(0);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
package io.dapr.it.actors.app;

import io.dapr.actors.ActorMethod;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

public interface MyActor {
String say(String something);
Mono<String> say(String something);

List<String> retrieveActiveActors();

Expand Down
26 changes: 14 additions & 12 deletions sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,21 @@ public MyActorBase(ActorRuntimeContext runtimeContext, ActorId id, TypeRef<T> re
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

@Override
public String say(String something) {
String reversedString = "";
try {
this.formatAndLog(true, "say");
reversedString = new StringBuilder(something).reverse().toString();

this.formatAndLog(false, "say");
} catch(Exception e) {
// We don't throw, but the proxy side will know it failed because it expects a reversed input
System.out.println("Caught " + e);
}
public Mono<String> say(String something) {
return Mono.fromCallable(() -> {
String reversedString = "";
try {
this.formatAndLog(true, "say");
reversedString = new StringBuilder(something).reverse().toString();

this.formatAndLog(false, "say");
} catch(Exception e) {
// We don't throw, but the proxy side will know it failed because it expects a reversed input
System.out.println("Caught " + e);
}

return reversedString;
return reversedString;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ public class OpenTelemetry {

/**
* Creates an opentelemetry instance.
* @param serviceName Name of the service in Zipkin
* @return OpenTelemetry.
*/
public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) throws InterruptedException {
public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry() throws InterruptedException {
waitForZipkin();
String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
ZipkinSpanExporter zipkinExporter = ZipkinSpanExporter.builder().setEndpoint(httpUrl + ENDPOINT_V2_SPANS).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class Validation {
public static final String JSONPATH_SLEEP_SPAN_ID =
"$..[?(@.parentId=='%s' && @.duration > 1000000 && @.name=='%s')]['id']";

public static void validate(String spanName, String sleepSpanName) throws Exception {
public static void validateSleep(String spanName, String sleepSpanName) throws Exception {
// Must wait for some time to make sure Zipkin receives all spans.
Thread.sleep(10000);
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.dapr.it.tracing.grpc;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.it.AppRun;
import io.dapr.it.BaseIT;
Expand Down Expand Up @@ -42,7 +41,7 @@ public void setup() throws Exception {

@Test
public void testInvoke() throws Exception {
OpenTelemetry openTelemetry = createOpenTelemetry("service over grpc");
OpenTelemetry openTelemetry = createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer("grpc integration test tracer");
String spanName = UUID.randomUUID().toString();
Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan();
Expand All @@ -59,6 +58,6 @@ public void testInvoke() throws Exception {

span.end();

Validation.validate(spanName, "calllocal/tracingitgrpc-service/sleepovergrpc");
Validation.validateSleep(spanName, "calllocal/tracingitgrpc-service/sleepovergrpc");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ public class OpenTelemetryConfig {

public static final String TRACER_NAME = "integration testing tracer";

public static final String SERVICE_NAME = "integration testing service over http";

@Bean
public OpenTelemetry initOpenTelemetry() throws InterruptedException {
return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry(SERVICE_NAME);
return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void setup() throws Exception {

@Test
public void testInvoke() throws Exception {
OpenTelemetry openTelemetry = createOpenTelemetry(OpenTelemetryConfig.SERVICE_NAME);
OpenTelemetry openTelemetry = createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(OpenTelemetryConfig.TRACER_NAME);
String spanName = UUID.randomUUID().toString();
Span span = tracer.spanBuilder(spanName).setSpanKind(SpanKind.CLIENT).startSpan();
Expand All @@ -57,7 +57,7 @@ public void testInvoke() throws Exception {

span.end();

Validation.validate(spanName, "calllocal/tracingithttp-service/sleep");
Validation.validateSleep(spanName, "calllocal/tracingithttp-service/sleep");
}

}
Loading