Skip to content

Commit

Permalink
gRPC and SSE coverage for OpenTelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
rsvoboda committed Jan 18, 2023
1 parent 2350b46 commit f327c60
Show file tree
Hide file tree
Showing 24 changed files with 810 additions and 0 deletions.
48 changes: 48 additions & 0 deletions monitoring/opentelemetry-reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,58 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus.qe</groupId>
<artifactId>quarkus-test-service-jaeger</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<!-- Skipped on Windows as does not support Linux Containers / Testcontainers -->
<profile>
<id>skip-tests-on-windows</id>
<activation>
<os>
<family>windows</family>
</os>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.ts.opentelemetry.reactive.grpc;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.quarkus.example.PongRequest;
import io.quarkus.example.PongServiceGrpc;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;

@Path("/grpc-ping")
public class GrpcPingResource extends TraceableResource {

@Inject
@GrpcClient("pong")
PongServiceGrpc.PongServiceBlockingStub pongClient;

@GET
@Produces(MediaType.TEXT_PLAIN)
public String getPing() {
recordTraceId();

return "ping " + pongClient.sayPong(PongRequest.newBuilder().build()).getMessage();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.ts.opentelemetry.reactive.grpc;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import io.quarkus.example.LastTraceIdRequest;
import io.quarkus.example.PongServiceGrpc;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;

@Path("/grpc-pong")
public class GrpcPongResource {

@Inject
@GrpcClient("pong")
PongServiceGrpc.PongServiceBlockingStub pongClient;

private static final Logger LOG = Logger.getLogger(TraceableResource.class);

@GET
@Path("/lastTraceId")
@Produces(MediaType.TEXT_PLAIN)
public String getLastTraceId() {
String lastTraceId = pongClient.returnLastTraceId(LastTraceIdRequest.newBuilder().build()).getMessage();
LOG.info("Recorded trace ID: " + lastTraceId);
return lastTraceId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.ts.opentelemetry.reactive.grpc;

import org.jboss.logmanager.MDC;

import io.grpc.stub.StreamObserver;
import io.quarkus.example.LastTraceIdReply;
import io.quarkus.example.LastTraceIdRequest;
import io.quarkus.example.PongReply;
import io.quarkus.example.PongRequest;
import io.quarkus.example.PongServiceGrpc;
import io.quarkus.grpc.GrpcService;

@GrpcService
public class GrpcPongService extends PongServiceGrpc.PongServiceImplBase {

private String lastTraceId;

@Override
public void sayPong(PongRequest request, StreamObserver<PongReply> responseObserver) {
lastTraceId = MDC.get("traceId");
responseObserver.onNext(PongReply.newBuilder().setMessage("pong").build());
responseObserver.onCompleted();
}

@Override
public void returnLastTraceId(LastTraceIdRequest request, StreamObserver<LastTraceIdReply> responseObserver) {
responseObserver.onNext(LastTraceIdReply.newBuilder().setMessage(getLastTraceId()).build());
responseObserver.onCompleted();
}

public String getLastTraceId() {
return lastTraceId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.quarkus.ts.opentelemetry.reactive.sse;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;
import io.smallrye.mutiny.Multi;

@Path("/server-sent-events-ping")
public class ServerSentEventsPingResource extends TraceableResource {

@Inject
@RestClient
ServerSentEventsPongClient pongClient;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> getPing() {
recordTraceId();
return pongClient.getPong().map(response -> "ping " + response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.ts.opentelemetry.reactive.sse;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

import io.smallrye.mutiny.Multi;

@RegisterRestClient
public interface ServerSentEventsPongClient {
@GET
@Path("/server-sent-events-pong")
@Produces(MediaType.SERVER_SENT_EVENTS)
Multi<String> getPong();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.ts.opentelemetry.reactive.sse;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.quarkus.ts.opentelemetry.reactive.traceable.TraceableResource;
import io.smallrye.mutiny.Multi;

@Path("/server-sent-events-pong")
public class ServerSentEventsPongResource extends TraceableResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> getPong() {
recordTraceId();
return Multi.createFrom().item("pong");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.ts.opentelemetry.reactive.traceable;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;
import org.jboss.logmanager.MDC;

public abstract class TraceableResource {

private static final Logger LOG = Logger.getLogger(TraceableResource.class);

private String lastTraceId;

@GET
@Path("/lastTraceId")
@Produces(MediaType.TEXT_PLAIN)
public String getLastTraceId() {
return lastTraceId;
}

protected void recordTraceId() {
lastTraceId = MDC.get("traceId");
LOG.info("Recorded trace ID: " + lastTraceId);
}
}
26 changes: 26 additions & 0 deletions monitoring/opentelemetry-reactive/src/main/proto/pong.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.quarkus.example";
option java_outer_classname = "PongProto";

package io.quarkus.example;

service PongService {
rpc SayPong (PongRequest) returns (PongReply) {}
rpc ReturnLastTraceId (LastTraceIdRequest) returns (LastTraceIdReply) {}
}

message PongRequest {
}

message PongReply {
string message = 1;
}

message LastTraceIdRequest {
}

message LastTraceIdReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
io.quarkus.ts.opentelemetry.reactive.PingPongService/mp-rest/url=${pongservice_url}:${pongservice_port}
io.quarkus.ts.opentelemetry.reactive.PingPongService/mp-rest/scope=javax.inject.Singleton

io.quarkus.ts.opentelemetry.reactive.sse.ServerSentEventsPongClient/mp-rest/url=http://localhost:${quarkus.http.port}
io.quarkus.ts.opentelemetry.reactive.sse.ServerSentEventsPongClient/mp-rest/scope=javax.inject.Singleton

# gRPC
quarkus.grpc.clients.pong.host=localhost

quarkus.application.name=pingpong
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.quarkus.ts.opentelemetry.reactive;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.concurrent.TimeUnit;

import org.apache.http.HttpStatus;
import org.junit.jupiter.api.Test;

import io.quarkus.test.bootstrap.JaegerService;
import io.quarkus.test.bootstrap.RestService;
import io.quarkus.test.scenarios.QuarkusScenario;
import io.quarkus.test.services.JaegerContainer;
import io.quarkus.test.services.QuarkusApplication;

@QuarkusScenario
public class OpenTelemetryGrpcIT {

@JaegerContainer(useOtlpCollector = true)
static final JaegerService jaeger = new JaegerService();

@QuarkusApplication()
static RestService app = new RestService()
.withProperty("quarkus.application.name", "pingpong")
.withProperty("quarkus.opentelemetry.tracer.exporter.otlp.endpoint", jaeger::getCollectorUrl);

private static final String PING_ENDPOINT = "/grpc-ping";
private static final String PONG_ENDPOINT = "/grpc-pong";
private static final String SAY_PONG_PROTO = "SayPong";

@Test
public void testServerClientTrace() throws InterruptedException {
// When calling ping, the rest will invoke also the pong rest endpoint.
given()
.when().get(PING_ENDPOINT)
.then().statusCode(HttpStatus.SC_OK)
.body(containsString("ping pong"));

// Then both ping and pong rest endpoints should have the same trace Id.
String pingTraceId = given()
.when().get(PING_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();

assertTraceIdWithPongService(pingTraceId);

// Then Jaeger is invoked
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> given()
.when().get(jaeger.getTraceUrl() + "?traceID=" + pingTraceId)
.then().statusCode(HttpStatus.SC_OK)
.and().body(allOf(containsString(PING_ENDPOINT), containsString(SAY_PONG_PROTO))));
}

protected void assertTraceIdWithPongService(String expected) {
String pongTraceId = given()
.when().get(PONG_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();

assertEquals(expected, pongTraceId);
}

}
Loading

0 comments on commit f327c60

Please sign in to comment.