Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds minimal traceparent header support to Elasticsearch #74210

Merged
merged 15 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testDeprecatedMessageWithoutXOpaqueId() throws IOException {
public void testCompatibleLog() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putTransient(Task.TRACE_ID, "someTraceId");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER,"someKey", "deprecated message1")
.compatibleApiWarning("compatibleKey","compatible API message");
Expand Down Expand Up @@ -143,6 +144,7 @@ public void testCompatibleLog() throws Exception {
hasEntry("message", "deprecated message1"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "someKey"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "other")
),
allOf(
Expand All @@ -159,6 +161,7 @@ public void testCompatibleLog() throws Exception {
hasEntry("message", "compatible API message"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "compatibleKey"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "compatible_api")
)
)
Expand All @@ -172,6 +175,7 @@ public void testCompatibleLog() throws Exception {
public void testParseFieldEmittingDeprecatedLogs() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putTransient(Task.TRACE_ID, "someTraceId");

ParseField deprecatedField = new ParseField("new_name", "deprecated_name");
assertTrue(deprecatedField.match("deprecated_name", LoggingDeprecationHandler.INSTANCE));
Expand Down Expand Up @@ -208,6 +212,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [deprecated_name] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "api")
),
// deprecation log for field deprecated_name2 (note it is not being throttled)
Expand All @@ -224,6 +229,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [deprecated_name2] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_deprecated_name2"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "api")
),
// compatible log line
Expand All @@ -240,6 +246,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
hasEntry("message", "Deprecated field [compatible_deprecated_name] used, expected [new_name] instead"),
hasEntry(DeprecatedMessage.KEY_FIELD_NAME, "deprecated_field_compatible_deprecated_name"),
hasEntry(DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME, "someId"),
hasEntry(Task.TRACE_ID, "someTraceId"),
hasEntry("elasticsearch.event.category", "compatible_api")
)
)
Expand All @@ -255,6 +262,7 @@ public void testParseFieldEmittingDeprecatedLogs() throws Exception {
public void testDeprecatedMessage() throws Exception {
withThreadContext(threadContext -> {
threadContext.putHeader(Task.X_OPAQUE_ID, "someId");
threadContext.putTransient(Task.TRACE_ID, "someTraceId");
final DeprecationLogger testLogger = DeprecationLogger.getLogger("org.elasticsearch.test");
testLogger.deprecate(DeprecationCategory.OTHER, "someKey", "deprecated message1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpr
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(Task.TRACE_PARENT, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public EcsLayout build() {
private KeyValuePair[] additionalFields() {
return new KeyValuePair[] {
new KeyValuePair("event.dataset", dataset),
new KeyValuePair("trace.id", "%trace_id"),
new KeyValuePair("elasticsearch.cluster.uuid", "%cluster_id"),
new KeyValuePair("elasticsearch.node.id", "%node_id"),
new KeyValuePair("elasticsearch.node.name", "%ESnode_name"),
new KeyValuePair("elasticsearch.cluster.name", "${sys:es.logs.cluster_name}"), };
}
}

public String getDataset() {
return dataset;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.logging;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

/**
* Pattern converter to format the trace id provided in the traceparent header into JSON fields <code>trace.id</code>.
*/
@Plugin(category = PatternConverter.CATEGORY, name = "TraceIdConverter")
@ConverterKeys({"trace_id"})
public final class TraceIdConverter extends LogEventPatternConverter {
/**
* Called by log4j2 to initialize this converter.
*/
public static TraceIdConverter newInstance(@SuppressWarnings("unused") final String[] options) {
return new TraceIdConverter();
}

public TraceIdConverter() {
super("trace_id", "trace_id");
}

public static String getTraceId() {
return HeaderWarning.THREAD_CONTEXT.stream()
.map(t -> t.<String>getTransient(Task.TRACE_ID))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

/**
* Formats the trace.id into json fields.
*
* @param event - a log event is ignored in this method as it uses the clusterId value
* from <code>NodeAndClusterIdStateListener</code> to format
*/
@Override
public void format(LogEvent event, StringBuilder toAppendTo) {
String traceId = getTraceId();
if (traceId != null) {
toAppendTo.append(traceId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,19 @@ public StoredContext stashContext() {
* This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
* Otherwise when context is stash, it should be empty.
*/
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
ThreadContextStruct threadContextStruct =
DEFAULT_CONTEXT.putHeaders(Map.of(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID)));

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_PARENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need for TRACE_ID ?

Copy link
Member

@felixbarny felixbarny Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so as TRACE_ID a transient header. It's not getting in our out of a single node. It's just used for log correlation.

Map<String, String> map = new HashMap<>(2, 1);
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
}
if (context.requestHeaders.containsKey(Task.TRACE_PARENT)) {
map.put(Task.TRACE_PARENT, context.requestHeaders.get(Task.TRACE_PARENT));
}
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
threadLocal.set(threadContextStruct);
} else {
}
else {
threadLocal.set(DEFAULT_CONTEXT);
}
return () -> {
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ protected Node(final Environment initialEnvironment,
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
Stream.of(Task.X_OPAQUE_ID, Task.TRACE_PARENT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make sure the header gets propagated to other downstream ES nodes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik this makes sure the headers go over the transport wire to other nodes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it would technically render the traceparent header invalid, we may want to zero out the parent-id part of the header when sending to downstream nodes.

Kibana 
  |
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
  |
  v
ES Node1
  |
traceparent: 00-0af7651916cd43dd8448eb211c80319c-0000000000000000-01
  |
  v
ES Node 2

That is because ES is not creating spans currently and thus there's no parent.
Based on the outcome of this discussion (#74210 (comment)), we don't need that part of the traceparent header anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed transaction.id.

I personally prefer not zeroing out and treat ES as pass-through for now given this piece of information is no longer used.

).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestHandler.Route;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.usage.UsageService;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -344,6 +345,12 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
BytesRestResponse.
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
return;
} else if (name.equals(Task.TRACE_PARENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we could extract the whole headers copy to a separate method?

String traceparent = distinctHeaderValues.get(0);
if (traceparent.length() >= 55) {
threadContext.putTransient(Task.TRACE_ID, traceparent.substring(3, 35));
Mpdreamz marked this conversation as resolved.
Show resolved Hide resolved
}
threadContext.putHeader(name, traceparent);
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class Task {
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";

public static final String TRACE_PARENT = "traceparent";

public static final String TRACE_ID = "trace.id";

private final long id;

private final String type;
Expand Down