Skip to content

Commit

Permalink
[#3597] Add more generic AMQP trace context propagation format.
Browse files Browse the repository at this point in the history
Writing trace-context information in the AMQP message
application-properties instead of the message-annotations, if
isUseLegacyTraceContextFormat has been set to false.
  • Loading branch information
calohmn committed Jan 15, 2024
1 parent b26cd94 commit 0e11502
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -340,7 +340,8 @@ private Future<ProtonDelivery> checkForCreditAndSend(
TracingHelper.TAG_QOS.set(currentSpan, sender.getQoS().toString());
Tags.SPAN_KIND.set(currentSpan, Tags.SPAN_KIND_PRODUCER);
TracingHelper.setDeviceTags(currentSpan, tenantId, AmqpUtils.getDeviceId(message));
AmqpUtils.injectSpanContext(connection.getTracer(), currentSpan.context(), message);
AmqpUtils.injectSpanContext(connection.getTracer(), currentSpan.context(), message,
connection.getConfig().isUseLegacyTraceContextFormat());

return connection.executeOnContext(result -> {
if (sender.sendQueueFull()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -638,7 +638,8 @@ private Future<R> sendRequest(
currentSpan.log(details);

final TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span> handler = TriTuple.of(res, responseMapper, currentSpan);
AmqpUtils.injectSpanContext(connection.getTracer(), currentSpan.context(), request);
AmqpUtils.injectSpanContext(connection.getTracer(), currentSpan.context(), request,
connection.getConfig().isUseLegacyTraceContextFormat());
replyMap.put(correlationId, handler);

final SendMessageSampler.Sample sample = sampler.start(tenantId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -90,6 +90,10 @@ public class ClientConfigProperties extends AuthenticatingClientConfigProperties
* size.
*/
public static final long MIN_MAX_MESSAGE_SIZE_NONE = 0;
/**
* The default value for deciding whether to use the legacy trace context format.
*/
public static final boolean DEFAULT_USE_LEGACY_TRACE_CONTEXT_FORMAT = true;

private Pattern addressRewritePattern;
private String addressRewriteReplacement;
Expand All @@ -111,6 +115,7 @@ public class ClientConfigProperties extends AuthenticatingClientConfigProperties
private long reconnectDelayIncrement = DEFAULT_RECONNECT_DELAY_INCREMENT;
private long requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private long sendMessageTimeout = DEFAULT_SEND_MESSAGE_TIMEOUT;
private boolean useLegacyTraceContextFormat = DEFAULT_USE_LEGACY_TRACE_CONTEXT_FORMAT;

/**
* Creates new properties using default values.
Expand Down Expand Up @@ -147,6 +152,7 @@ public ClientConfigProperties(final ClientConfigProperties otherProperties) {
this.reconnectDelayIncrement = otherProperties.reconnectDelayIncrement;
this.requestTimeout = otherProperties.requestTimeout;
this.sendMessageTimeout = otherProperties.sendMessageTimeout;
this.useLegacyTraceContextFormat = otherProperties.useLegacyTraceContextFormat;
}

/**
Expand Down Expand Up @@ -174,6 +180,7 @@ public ClientConfigProperties(final ClientOptions options) {
setReconnectDelayIncrement(options.reconnectDelayIncrement());
setRequestTimeout(options.requestTimeout());
setSendMessageTimeout(options.sendMessageTimeout());
setUseLegacyTraceContextFormat(options.useLegacyTraceContextFormat());
}

/**
Expand Down Expand Up @@ -802,4 +809,24 @@ public final int getMaxSessionWindowSize() {
return 0;
}
}

/**
* Checks whether the legacy trace context format shall be used, writing to the message annotations instead of the
* application properties.
*
* @return {@code true} if the legacy format shall be used.
*/
public final boolean isUseLegacyTraceContextFormat() {
return useLegacyTraceContextFormat;
}

/**
* Sets whether the legacy trace context format shall be used, writing to the message annotations instead of the
* application properties.
*
* @param useLegacyTraceContextFormat {@code true} if the legacy format shall be used.
*/
public final void setUseLegacyTraceContextFormat(final boolean useLegacyTraceContextFormat) {
this.useLegacyTraceContextFormat = useLegacyTraceContextFormat;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -217,4 +217,13 @@ public interface ClientOptions {
*/
@WithDefault("-1")
int maxSessionFrames();

/**
* Checks whether the legacy trace context format shall be used, writing to the message annotations instead of the
* application properties.
*
* @return {@code true} if the legacy format shall be used.
*/
@WithDefault("true")
boolean useLegacyTraceContextFormat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.eclipse.hono.auth.Authorities;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.auth.HonoUserAdapter;
import org.eclipse.hono.client.amqp.tracing.MessageAnnotationsExtractAdapter;
import org.eclipse.hono.client.amqp.tracing.AmqpMessageExtractAdapter;
import org.eclipse.hono.client.amqp.tracing.AmqpMessageInjectAdapter;
import org.eclipse.hono.client.amqp.tracing.MessageAnnotationsInjectAdapter;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.CommandConstants;
Expand All @@ -45,6 +46,7 @@
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
Expand Down Expand Up @@ -117,7 +119,7 @@ public Authorities getAuthorities() {
}
};

private static final String AMQP_ANNOTATION_NAME_TRACE_CONTEXT = "x-opt-trace-context";
private static final String LEGACY_AMQP_ANNOTATION_NAME_TRACE_CONTEXT = "x-opt-trace-context";

private AmqpUtils() {
// prevent instantiation
Expand Down Expand Up @@ -170,28 +172,34 @@ public static void setClientPrincipal(final ProtonConnection con, final HonoUser
/**
* Injects a {@code SpanContext} into an AMQP {@code Message}.
* <p>
* The span context will be written to the message annotations of the given message.
* The span context will be written either to the message annotations (if {@code useLegacyTraceContextFormat} is
* {@code true}) or the application properties of the given message.
*
* @param tracer The Tracer to use for injecting the context.
* @param spanContext The context to inject or {@code null} if no context is available.
* @param message The AMQP {@code Message} object to inject the context into.
* @param useLegacyTraceContextFormat If {@code true}, the legacy trace context format will be used (writing to
* a map in the message annotation properties).
* @throws NullPointerException if tracer or message is {@code null}.
*/
public static void injectSpanContext(final Tracer tracer, final SpanContext spanContext, final Message message) {
public static void injectSpanContext(final Tracer tracer, final SpanContext spanContext, final Message message,
final boolean useLegacyTraceContextFormat) {

Objects.requireNonNull(tracer);
Objects.requireNonNull(message);

if (spanContext != null && !(spanContext instanceof NoopSpanContext)) {
tracer.inject(spanContext, Format.Builtin.TEXT_MAP,
new MessageAnnotationsInjectAdapter(message, AMQP_ANNOTATION_NAME_TRACE_CONTEXT));
final TextMap injectAdapter = useLegacyTraceContextFormat
? new MessageAnnotationsInjectAdapter(message, LEGACY_AMQP_ANNOTATION_NAME_TRACE_CONTEXT)
: new AmqpMessageInjectAdapter(message);
tracer.inject(spanContext, Format.Builtin.TEXT_MAP, injectAdapter);
}
}

/**
* Extracts a {@code SpanContext} from an AMQP {@code Message}.
* <p>
* The span context will be read from the message annotations of the given message.
* The span context will be read from the message annotations or the application properties of the given message.
*
* @param tracer The Tracer to use for extracting the context.
* @param message The AMQP {@code Message} to extract the context from.
Expand All @@ -204,7 +212,7 @@ public static SpanContext extractSpanContext(final Tracer tracer, final Message
Objects.requireNonNull(message);

return tracer.extract(Format.Builtin.TEXT_MAP,
new MessageAnnotationsExtractAdapter(message, AMQP_ANNOTATION_NAME_TRACE_CONTEXT));
new AmqpMessageExtractAdapter(message, LEGACY_AMQP_ANNOTATION_NAME_TRACE_CONTEXT));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.client.amqp.tracing;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;

import io.opentracing.propagation.TextMap;

/**
* An adapter for extracting trace context information from the message annotations (representing the legacy format)
* or application properties of an AMQP 1.0 message.
*/
public final class AmqpMessageExtractAdapter implements TextMap {

private final Message message;
private final MessageAnnotationsExtractAdapter legacyAdapter;

/**
* Creates an adapter for a message.
* <p>
* Trace context information will be extracted from the application properties of the message.
*
* @param message The message.
* @throws NullPointerException if message is {@code null}.
*/
public AmqpMessageExtractAdapter(final Message message) {
this(message, null);
}

/**
* Creates an adapter for a message.
* <p>
* If the {@code legacyMessageAnnotationsPropertiesMapName} constructor parameter is not {@code null}, the
* information is extracted from the corresponding map in the message annotations, if that map exists.
* Otherwise, the application properties of the message will be used for extracting the trace context information.
*
* @param message The message.
* @param legacyMessageAnnotationsPropertiesMapName The name of the message annotation of type map from where to
* extract the trace properties (if that map exists).
* @throws NullPointerException if message is {@code null}.
*/
public AmqpMessageExtractAdapter(final Message message, final String legacyMessageAnnotationsPropertiesMapName) {
this.message = Objects.requireNonNull(message);
this.legacyAdapter = Optional.ofNullable(legacyMessageAnnotationsPropertiesMapName)
.map(mapName -> new MessageAnnotationsExtractAdapter(message, mapName))
.orElse(null);
}

@Override
public Iterator<Entry<String, String>> iterator() {

if (legacyAdapter != null) {
final Map<?, ?> legacyPropertiesMap = legacyAdapter.getMessageAnnotationsPropertiesMap();
if (!legacyPropertiesMap.isEmpty()) {
return mapEntriesIterator(legacyPropertiesMap.entrySet().iterator());
}
}

final ApplicationProperties applicationProperties = message.getApplicationProperties();
if (applicationProperties == null || applicationProperties.getValue() == null) {
return Collections.emptyIterator();
}
return mapEntriesIterator(applicationProperties.getValue().entrySet().iterator());
}

private static Iterator<Entry<String, String>> mapEntriesIterator(final Iterator<? extends Entry<?, ?>> entriesIterator) {
return new Iterator<>() {

@Override
public boolean hasNext() {
return entriesIterator.hasNext();
}

@Override
public Entry<String, String> next() {
final Entry<?, ?> nextEntry = entriesIterator.next();
return new AbstractMap.SimpleEntry<>(nextEntry.getKey().toString(), nextEntry.getValue().toString());
}
};
}

@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/
package org.eclipse.hono.client.amqp.tracing;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;

import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;

import io.opentracing.propagation.TextMap;

/**
* An adapter for injecting trace context information in the application properties of an AMQP 1.0 message.
*
*/
public class AmqpMessageInjectAdapter implements TextMap {

private final Message message;

/**
* Creates an adapter for a message.
*
* @param message The message.
* @throws NullPointerException if message is {@code null}.
*/
public AmqpMessageInjectAdapter(final Message message) {
this.message = Objects.requireNonNull(message);
}

@Override
public Iterator<Entry<String, String>> iterator() {
throw new UnsupportedOperationException();
}

@Override
public void put(final String key, final String value) {
final ApplicationProperties applicationProperties;
if (message.getApplicationProperties() != null && message.getApplicationProperties().getValue() != null) {
applicationProperties = message.getApplicationProperties();
} else {
applicationProperties = new ApplicationProperties(new HashMap<>());
message.setApplicationProperties(applicationProperties);
}
applicationProperties.getValue().put(key, value);
}
}
Loading

0 comments on commit 0e11502

Please sign in to comment.