Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…erver-embedded into publish-distributedlog
  • Loading branch information
arusevm committed Sep 28, 2024
2 parents b91a6fc + 37c9c7b commit 4969c4c
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 24 deletions.
41 changes: 34 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,47 @@
# OpenTelemetry Data Sources for Java

This repository contains OpenTelemetry servers that can be embedded into other Java-based systems to act as data sources
This repository contains [OpenTelemetry](https://opentelemetry.io/) servers that can be embedded into other Java-based systems to act as data sources
for logs, metrics, traces and profiles signals.

Here you can also find implementations of such data sources for a few popular open source softwares and additional tools
to use when working with OpenTelemetry data.

It is also part of a broader set of [OpenTelemetry-related activities](https://mishmash.io/open_source/opentelemetry) at
[mishmash io](https://mishmash.io/).

This is a public release of code we have accumulated internally over time and so far contains only a limited subset of
what we intend to share. Future releases will add modules for authentication and authorization, visualization and more.

Watch this repository for updates.

Also take a look at the READMEs of the individual packages in this repository:
# OpenTelemetry for Developers, Data Engineers and Data Scientists

We have prepared a few Jupyter notebooks that visually explore OpenTelemetry data that we collected from [a demo Astronomy webshop app](https://github.com/mishmash-io/opentelemetry-demo-to-parquet)
using the [Apache Parquet Stand-alone server](./server-parquet) contained in this repository.

If you are the sort of person who prefers to learn by looking at **actual data** - start with the [OpenTelemetry Basics Notebook.](./examples/notebooks/basics.ipynb)

# Artifacts

## Embeddable collectors

The base artifact - `collector-embedded` contains classes that handle the OTLP protocol (over both gRPC and HTTP).
- [README](./collector-embedded)
- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/collector-embedded)

## Apache Parquet Stand-alone server

This artifact contains a runnable OTLP-protocol server that receives signals from OpenTelemetry and saves them into [Apache Paruqet](https://parquet.apache.org/) files.

It is not intended for production use, but rather as a quick tool to save and explore OpenTelemetry data locally. [The Basics Jupyter Notebook](./examples/notebooks/basics.ipynb) explores
Parquet files as saved by this Stand-alone server.
- [README](./server-parquet)
- [Javadoc on javadoc.io](https://javadoc.io/doc/io.mishmash.opentelemetry/server-parquet)

# OpenTelemetry at mishmash io

OpenTelemetry's main intent is the observability of production environments, but at [mishmash io](https://mishmash.io) it is part of our software development process. By saving telemetry from **experiments** and **tests** of
our own algorithms we ensure things like **performance** and **resource usage** of our distributed database, continuously and across releases.

We believe that adopting OpenTelemetry as a software development tool might be useful to you too, which is why we decided to open-source the tools we've built.

Learn more about the broader set of [OpenTelemetry-related activities](https://mishmash.io/open_source/opentelemetry) at
[mishmash io](https://mishmash.io/) and `follow` [GitHub profile](https://github.com/mishmash-io) for updates and new releases.

- [embeddable OpenTelemetry collectors](./collector-embedded)
- [Apache Parquet Stand-alone server](./server-parquet)
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.grpc.common.GrpcStatus;
Expand Down Expand Up @@ -141,6 +144,12 @@ public abstract class AbstractCollector<
*/
private static final String CTX_OTEL_CONTEXT = "otelContext";

/**
* Vertx context parameter key for the user who is sending telemetry
* (if authentication was enabled).
*/
public static final String VCTX_EMITTER = "emitter";

/**
* The helper object for own telemetry.
*/
Expand Down Expand Up @@ -483,6 +492,12 @@ protected Batch<RECORD> publish(
span.setAttribute("otel.transport", transport);
span.setAttribute("otel.encoding", encoding);

User user = Vertx.currentContext().get(VCTX_EMITTER);
span.setAttribute("otel.is_authenticated", user != null);
if (user != null) {
setSpanAttributesForUser(span, user);
}

minDemand.record(estimateMinimumDemand());

try {
Expand Down Expand Up @@ -1227,6 +1242,44 @@ protected Context getOtelContext(final RoutingContext ctx) {
return ctx.get(CTX_OTEL_CONTEXT);
}

/**
* Set own-telemetry span attributes when an authenticated
* user is present.
*
* @param span the span to set the attributes to
* @param user the authenticated user who is emitting telemetry
*/
protected void setSpanAttributesForUser(
final Span span,
final User user) {
JsonObject principal = user.principal();

if (principal.containsKey("iss")) {
span.setAttribute("otel.auth.issuer",
principal.getString("iss"));
}

if (principal.containsKey("aud")) {
span.setAttribute("otel.auth.audience",
principal.getString("aud"));
}

if (principal.containsKey("sub")) {
span.setAttribute("otel.auth.subject",
principal.getString("sub"));
}

if (principal.containsKey("azp")) {
span.setAttribute("otel.auth.authorized_party",
principal.getString("azp"));
}

if (principal.containsKey("preferred_username")) {
span.setAttribute("otel.auth.preferred_username",
principal.getString("preferred_username"));
}
}

/**
* Parses the Content-Length header of an incoming HTTP request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.opentelemetry.proto.resource.v1.Resource;
import io.vertx.ext.auth.User;

/**
* Holds the details of an individual OTLP log record
Expand Down Expand Up @@ -80,9 +81,13 @@ public class Log extends SubscribersBatch<Log> {
* @param parent parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public Log(final Batch<Log> parent, final Context otelContext) {
super(parent, otelContext);
public Log(
final Batch<Log> parent,
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry logs packets - extracts individual
Expand Down Expand Up @@ -108,7 +109,9 @@ public Batch<Log> loadBatch(
Span recordSpan = getInstrumentation()
.startNewSpan("otel.record");

Log l = new Log(batch, Context.current());
Log l = new Log(batch,
Context.current(),
Vertx.currentContext().get(VCTX_EMITTER));
l.setFrom(
timestamp,
uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;
import io.vertx.ext.auth.User;

/**
* Holds the details of an individual OTLP metric data point
Expand Down Expand Up @@ -146,11 +147,13 @@ public class MetricDataPoint extends SubscribersBatch<MetricDataPoint> {
* @param parent parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public MetricDataPoint(
final Batch<MetricDataPoint> parent,
final Context otelContext) {
super(parent, otelContext);
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry metrics packets - extracts all
Expand Down Expand Up @@ -141,7 +142,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -174,7 +177,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -207,7 +212,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -240,7 +247,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down Expand Up @@ -272,7 +281,9 @@ public Batch<MetricDataPoint> loadBatch(

MetricDataPoint m = new MetricDataPoint(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
m.setFrom(timestamp,
uuid,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.opentelemetry.proto.profiles.v1experimental.Sample;
import io.opentelemetry.proto.profiles.v1experimental.ScopeProfiles;
import io.opentelemetry.proto.resource.v1.Resource;
import io.vertx.ext.auth.User;

/**
* Holds an individual OTLP Profile Sample value as a {@link Batch}
Expand Down Expand Up @@ -115,11 +116,13 @@ public class ProfileSampleValue
* @param parent the parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public ProfileSampleValue(
final Batch<ProfileSampleValue> parent,
final Context otelContext) {
super(parent, otelContext);
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.opentelemetry.proto.profiles.v1experimental.ResourceProfiles;
import io.opentelemetry.proto.profiles.v1experimental.Sample;
import io.opentelemetry.proto.profiles.v1experimental.ScopeProfiles;
import io.vertx.core.Vertx;

/**
* Processes incoming OpenTelemetry profiles packets - extracts individual
Expand Down Expand Up @@ -137,7 +138,9 @@ protected Batch<ProfileSampleValue> loadBatch(
ProfileSampleValue lv =
new ProfileSampleValue(
batch,
Context.current());
Context.current(),
Vertx.currentContext()
.get(VCTX_EMITTER));
lv.setFrom(
timestamp,
uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.vertx.ext.auth.User;

/**
* Holds the details of an individual OTLP span record
Expand Down Expand Up @@ -79,9 +80,13 @@ public class Span extends SubscribersBatch<Span> {
* @param parent parent {@link Batch}
* @param otelContext {@link io.opentelemetry.context.Context} for own
* telemetry
* @param authUser the authenticated user or null if auth wasn't enabled
*/
public Span(final Batch<Span> parent, final Context otelContext) {
super(parent, otelContext);
public Span(
final Batch<Span> parent,
final Context otelContext,
final User authUser) {
super(parent, otelContext, authUser);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.Flow.Subscriber;

import io.opentelemetry.context.Context;
import io.vertx.ext.auth.User;

/**
* A 'batch' of all {@link LogsSubscriber}s, {@link MetricsSubscriber}s,
Expand All @@ -31,18 +32,27 @@
*/
public class SubscribersBatch<T> extends Batch<Subscriber<? super T>> {

/**
* Holds the user who emitted this signal (if authentication was enabled).
*/
private User user;

/**
* Create a new 'batch' of subscribers.
*
* @param parent the batch of OpenTelemetry data
* @param otelContext {@link io.opentelemetry.context.Context} for
* @param authUser the authenticated user or null if auth wasn't enabled
* own telemetry
*/
public SubscribersBatch(
final Batch<T> parent,
final Context otelContext) {
final Context otelContext,
final User authUser) {
super(otelContext);

this.user = authUser;

@SuppressWarnings("unchecked")
T self = (T) this;

Expand All @@ -65,4 +75,22 @@ protected void otelComplete(final Throwable t) {
* span in a subscriber
*/
}

/**
* Returns the authenticated user (if authentication was enabled).
*
* @return the user or null if authentication was not enabled.
*/
public User getUser() {
return user;
}

/**
* Set the authenticated user.
*
* @param authUser the user
*/
public void setUser(final User authUser) {
this.user = authUser;
}
}
Loading

0 comments on commit 4969c4c

Please sign in to comment.