Skip to content

Commit

Permalink
Add support to use trace propagated from client
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Aug 23, 2023
1 parent 5d3633c commit 2a2cd29
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.http.HttpHeader;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -44,13 +45,13 @@ public SpanScope startSpan(String spanName) {

@Override
public SpanScope startSpan(String spanName, Attributes attributes) {
return startSpan(spanName, null, attributes);
return startSpan(spanName, SpanContext.EMPTY, attributes);
}

@Override
public SpanScope startSpan(String spanName, SpanContext parentSpan, Attributes attributes) {
Span span = null;
if (parentSpan != null) {
if (parentSpan != null && parentSpan.getSpan() != null) {
span = createSpan(spanName, parentSpan.getSpan(), attributes);
} else {
span = createSpan(spanName, getCurrentSpanInternal(), attributes);
Expand Down Expand Up @@ -97,4 +98,10 @@ protected void addDefaultAttributes(Span span) {
span.addAttribute(THREAD_NAME, Thread.currentThread().getName());
}

@Override
public SpanScope startSpan(String spanName, HttpHeader header, Attributes attributes) {
Span propagatedSpan = tracingTelemetry.getContextPropagator().extract(header);
return startSpan(spanName, new SpanContext(propagatedSpan), attributes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
*/
public final class SpanContext {
private final Span span;
/**
* Empty {@link SpanContext}
*/
public static final SpanContext EMPTY = new SpanContext(null);

/**
* Constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.http.HttpTracer;

import java.io.Closeable;

Expand All @@ -18,7 +19,7 @@
*
* All methods on the Tracer object are multi-thread safe.
*/
public interface Tracer extends Closeable {
public interface Tracer extends HttpTracer, Closeable {

/**
* Starts the {@link Span} with given name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.http.HttpHeader;

import java.util.Map;
import java.util.function.BiConsumer;

Expand All @@ -25,6 +27,13 @@ public interface TracingContextPropagator {
*/
Span extract(Map<String, String> props);

/**
* Extracts current span http header.
* @param httpHeader properties
* @return current span
*/
Span extract(HttpHeader httpHeader);

/**
* Injects tracing context
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;

import java.util.List;
import java.util.Map;

/**
* Represents the Http Header.
*/
public class HttpHeader {
private final Map<String, List<String>> header;

/**
* Constructor
* @param header header map.
*/
public HttpHeader(Map<String, List<String>> header) {
this.header = header;
}

/**
* Returns header map.
* @return header.
*/
public Map<String, List<String>> getHeader() {
return header;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.http;

import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.attributes.Attributes;

/**
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
* from the HttpRequest header and propagate the span accordingly.
*
* All methods on the Tracer object are multi-thread safe.
*/
public interface HttpTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanName span name.
* @param header http request header.
* @param attributes span attributes.
* @return scope of the span, must be closed with explicit close or with try-with-resource
*/
SpanScope startSpan(String spanName, HttpHeader header, Attributes attributes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains No-op implementations
*/
package org.opensearch.telemetry.tracing.http;
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.http.HttpHeader;

/**
* No-op implementation of Tracer
Expand Down Expand Up @@ -51,4 +52,9 @@ public SpanContext getCurrentSpan() {
public void close() {

}

@Override
public SpanScope startSpan(String spanName, HttpHeader header, Attributes attributes) {
return SpanScope.NO_OP;

Check warning on line 58 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java#L58

Added line #L58 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.http.HttpHeader;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.telemetry.tracing.MockSpan;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.junit.Assert;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -104,14 +109,37 @@ public void testCreateSpanWithParent() {
Assert.assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
}

public void testHttpTracer() {
String traceId = "trace_id";
String spanId = "span_id";
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();

DefaultTracer defaultTracer = new DefaultTracer(
tracingTelemetry,
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

Map<String, List<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList(traceId + "~" + spanId));
HttpHeader header = new HttpHeader(requestHeaders);

SpanScope spanScope = defaultTracer.startSpan("test_span", header, Attributes.EMPTY);
SpanContext currentSpan = defaultTracer.getCurrentSpan();
assertNotNull(currentSpan);
assertEquals(traceId, currentSpan.getSpan().getTraceId());
assertEquals(traceId, currentSpan.getSpan().getParentSpan().getTraceId());
assertEquals(spanId, currentSpan.getSpan().getParentSpan().getSpanId());
spanScope.close();
}

public void testCreateSpanWithNullParent() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
DefaultTracer defaultTracer = new DefaultTracer(
tracingTelemetry,
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
);

defaultTracer.startSpan("span_name", null, Attributes.EMPTY);
defaultTracer.startSpan("span_name", SpanContext.EMPTY, Attributes.EMPTY);

Assert.assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
Assert.assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testRunnableWithParent() throws Exception {
DefaultTracer defaultTracer = new DefaultTracer(new MockTracingTelemetry(), contextStorage);
defaultTracer.startSpan(parentSpanName);
SpanContext parentSpan = defaultTracer.getCurrentSpan();
AtomicReference<SpanContext> currrntSpan = new AtomicReference<>(new SpanContext(null));
AtomicReference<SpanContext> currrntSpan = new AtomicReference<>(SpanContext.EMPTY);
final AtomicBoolean isRunnableCompleted = new AtomicBoolean(false);
TraceableRunnable traceableRunnable = new TraceableRunnable(defaultTracer, spanName, parentSpan, Attributes.EMPTY, () -> {
isRunnableCompleted.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.core.common.Strings;
import org.opensearch.telemetry.tracing.http.HttpHeader;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

Expand All @@ -34,13 +39,25 @@ public OTelTracingContextPropagator(OpenTelemetry openTelemetry) {
@Override
public Span extract(Map<String, String> props) {
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), props, TEXT_MAP_GETTER);
return getPropagatedSpan(context);
}

private static OTelPropagatedSpan getPropagatedSpan(Context context) {
if (context != null) {
io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.fromContext(context);
return new OTelPropagatedSpan(span);
}
return null;
}

@Override
public Span extract(HttpHeader httpHeader) {
Context context = openTelemetry.getPropagators()
.getTextMapPropagator()
.extract(Context.current(), httpHeader, HTTP_HEADER_MAP_GETTER);
return getPropagatedSpan(context);
}

@Override
public void inject(Span currentSpan, BiConsumer<String, String> setter) {
openTelemetry.getPropagators().getTextMapPropagator().inject(context((OTelSpan) currentSpan), setter, TEXT_MAP_SETTER);
Expand Down Expand Up @@ -72,4 +89,25 @@ public String get(Map<String, String> headers, String key) {
}
};

private static final TextMapGetter<HttpHeader> HTTP_HEADER_MAP_GETTER = new TextMapGetter<>() {
@Override
public Iterable<String> keys(HttpHeader httpHeader) {
Map<String, List<String>> headerMap = httpHeader.getHeader();

Check warning on line 95 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java#L95

Added line #L95 was not covered by tests
if (headerMap != null) {
return httpHeader.getHeader().keySet();

Check warning on line 97 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java#L97

Added line #L97 was not covered by tests
} else {
return Collections.emptySet();

Check warning on line 99 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java#L99

Added line #L99 was not covered by tests
}
}

@Override
public String get(HttpHeader httpHeader, String key) {
Map<String, List<String>> headerMap = httpHeader.getHeader();
if (headerMap != null && headerMap.containsKey(key)) {
return Strings.collectionToCommaDelimitedString(headerMap.get(key));
}
return null;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.http.HttpHeader;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -19,6 +22,7 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -52,4 +56,38 @@ public void testExtractTracerContextFromHeader() {
assertEquals(TRACE_ID, span.getTraceId());
assertEquals(SPAN_ID, span.getSpanId());
}

public void testExtractTracerContextFromHttpHeader() {
Map<String, List<String>> requestHeaders = new HashMap<>();
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
HttpHeader header = new HttpHeader(requestHeaders);
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(header);
assertEquals(TRACE_ID, span.getTraceId());
assertEquals(SPAN_ID, span.getSpanId());
}

public void testExtractTracerContextFromHttpHeaderNull() {
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(new HttpHeader(null));
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
}

public void testExtractTracerContextFromHttpHeaderEmpty() {
Map<String, List<String>> requestHeaders = new HashMap<>();
HttpHeader header = new HttpHeader(requestHeaders);
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(header);
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
}
}
Loading

0 comments on commit 2a2cd29

Please sign in to comment.