Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow an authenticated user to propagate down to subscribers #28

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.grpc.common.GrpcStatus;
Expand Down Expand Up @@ -141,6 +144,12 @@ public abstract class AbstractCollector<
*/
private static final String CTX_OTEL_CONTEXT = "otelContext";

/**
* Vertx context parameter key for the user who is sending telemetry
* (if authentication was enabled).
*/
public static final String VCTX_EMITTER = "emitter";

/**
* The helper object for own telemetry.
*/
Expand Down Expand Up @@ -483,6 +492,12 @@ protected Batch<RECORD> publish(
span.setAttribute("otel.transport", transport);
span.setAttribute("otel.encoding", encoding);

User user = Vertx.currentContext().get(VCTX_EMITTER);
span.setAttribute("otel.is_authenticated", user != null);
if (user != null) {
setSpanAttributesForUser(span, user);
}

minDemand.record(estimateMinimumDemand());

try {
Expand Down Expand Up @@ -1227,6 +1242,44 @@ protected Context getOtelContext(final RoutingContext ctx) {
return ctx.get(CTX_OTEL_CONTEXT);
}

/**
* Set own-telemetry span attributes when an authenticated
* user is present.
*
* @param span the span to set the attributes to
* @param user the authenticated user who is emitting telemetry
*/
protected void setSpanAttributesForUser(
final Span span,
final User user) {
JsonObject principal = user.principal();

if (principal.containsKey("iss")) {
span.setAttribute("otel.auth.issuer",
principal.getString("iss"));
}

if (principal.containsKey("aud")) {
span.setAttribute("otel.auth.audience",
principal.getString("aud"));
}

if (principal.containsKey("sub")) {
span.setAttribute("otel.auth.subject",
principal.getString("sub"));
}

if (principal.containsKey("azp")) {
span.setAttribute("otel.auth.authorized_party",
principal.getString("azp"));
}

if (principal.containsKey("preferred_username")) {
span.setAttribute("otel.auth.preferred_username",
principal.getString("preferred_username"));
}
}

/**
* Parses the Content-Length header of an incoming HTTP request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.opentelemetry.proto.resource.v1.Resource;
import io.vertx.ext.auth.User;

/**
* Holds the details of an individual OTLP log record
Expand Down Expand Up @@ -80,9 +81,13 @@ public class Log extends SubscribersBatch<Log> {
* @param parent parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public Log(final Batch<Log> parent, final Context otelContext) {
super(parent, otelContext);
public Log(
final Batch<Log> parent,
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry logs packets - extracts individual
Expand Down Expand Up @@ -108,7 +109,9 @@ public Batch<Log> loadBatch(
Span recordSpan = getInstrumentation()
.startNewSpan("otel.record");

Log l = new Log(batch, Context.current());
Log l = new Log(batch,
Context.current(),
Vertx.currentContext().get(VCTX_EMITTER));
l.setFrom(
timestamp,
uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;
import io.vertx.ext.auth.User;

/**
* Holds the details of an individual OTLP metric data point
Expand Down Expand Up @@ -146,11 +147,13 @@ public class MetricDataPoint extends SubscribersBatch<MetricDataPoint> {
* @param parent parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public MetricDataPoint(
final Batch<MetricDataPoint> parent,
final Context otelContext) {
super(parent, otelContext);
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry metrics packets - extracts all
Expand Down Expand Up @@ -141,7 +142,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -174,7 +177,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -207,7 +212,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -240,7 +247,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -272,7 +281,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.opentelemetry.proto.profiles.v1experimental.Sample;
import io.opentelemetry.proto.profiles.v1experimental.ScopeProfiles;
import io.opentelemetry.proto.resource.v1.Resource;
import io.vertx.ext.auth.User;

/**
* Holds an individual OTLP Profile Sample value as a {@link Batch}
Expand Down Expand Up @@ -115,11 +116,13 @@ public class ProfileSampleValue
* @param parent the parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public ProfileSampleValue(
final Batch<ProfileSampleValue> parent,
final Context otelContext) {
super(parent, otelContext);
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.opentelemetry.proto.profiles.v1experimental.ResourceProfiles;
import io.opentelemetry.proto.profiles.v1experimental.Sample;
import io.opentelemetry.proto.profiles.v1experimental.ScopeProfiles;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry profiles packets - extracts individual
Expand Down Expand Up @@ -137,7 +138,9 @@ protected Batch<ProfileSampleValue> loadBatch(
ProfileSampleValue lv =
new ProfileSampleValue(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
lv.setFrom(
timestamp,
uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.vertx.ext.auth.User;

/**
* Holds the details of an individual OTLP span record
Expand Down Expand Up @@ -79,9 +80,13 @@ public class Span extends SubscribersBatch<Span> {
* @param parent parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public Span(final Batch<Span> parent, final Context otelContext) {
super(parent, otelContext);
public Span(
final Batch<Span> parent,
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.Flow.Subscriber;

import io.opentelemetry.context.Context;
import io.vertx.ext.auth.User;

/**
* A 'batch' of all {@link LogsSubscriber}s, {@link MetricsSubscriber}s
Expand All @@ -30,18 +31,27 @@
*/
public class SubscribersBatch<T> extends Batch<Subscriber<? super T>> {

/**
* Holds the user who emitted this signal (if authentication was enabled).
*/
private User user;

/**
* Create a new 'batch' of subscribers.
*
* @param parent the batch of OpenTelemetry data
* @param otelContext {@link io.opentelemetry.context.Context} for
* @param authUser the authenticated user or null if auth wasn't enabled
* own telemetry
*/
public SubscribersBatch(
final Batch<T> parent,
final Context otelContext) {
final Context otelContext,
final User authUser) {
super(otelContext);

this.user = authUser;

@SuppressWarnings("unchecked")
T self = (T) this;

Expand All @@ -64,4 +74,22 @@ protected void otelComplete(final Throwable t) {
* span in a subscriber
*/
}

/**
* Returns the authenticated user (if authentication was enabled).
*
* @return the user or null if authentication was not enabled.
*/
public User getUser() {
return user;
}

/**
* Set the authenticated user.
*
* @param authUser the user
*/
public void setUser(final User authUser) {
this.user = authUser;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry traces packets - extracts individual
Expand Down Expand Up @@ -109,7 +110,9 @@ public Batch<Span> loadBatch(
getInstrumentation()
.startNewSpan("otel.record");

Span s = new Span(batch, otelContext);
Span s = new Span(batch,
otelContext,
Vertx.currentContext().get(VCTX_EMITTER));
s.setFrom(
timestamp,
uuid,
Expand Down
Loading