Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More updates for the current OTel (OpenTelemetry) conventions #1717

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ Here's the recommended workflow:
If what you are going to work on is a substantial change, please first
ask the core team for their opinion on the [RabbitMQ users mailing list][rmq-users].

### Building Source

To build the projects first, clone the repository. Make sure you clone the
repository and its submodules:

It is good practice to make sure you can build the project before making any
changes to confirm that your development environment is set
up correctly. Verifying that the tests pass is also a good practice (see
[RUNNING_TESTS.md](/RUNNING_TESTS.md)).

All together, this looks like:

```shell
git clone --recurse-submodules https://github.com/rabbitmq/rabbitmq-dotnet-client
cd rabbitmq-dotnet-client
dotnet build ./Build.csproj
./.ci/ubuntu/gha-setup.sh # On any Linux distribution with Docker installed
./.ci/windows/gha-setup.ps1 # On Windows
make test
```

### Running Tests

See [RUNNING_TESTS.md](/RUNNING_TESTS.md).
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
: default;

ulong publishSequenceNumber = 0;
Expand Down Expand Up @@ -117,7 +117,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
: default;

ulong publishSequenceNumber = 0;
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -918,10 +918,10 @@ await ModelSendAsync(in method, k.CancellationToken)
BasicGetResult? result = await k;

using Activity? activity = result != null
? RabbitMQActivitySource.Receive(result.RoutingKey,
? RabbitMQActivitySource.BasicGet(result.RoutingKey,
result.Exchange,
result.DeliveryTag, result.BasicProperties, result.Body.Length)
: RabbitMQActivitySource.ReceiveEmpty(queue);
: RabbitMQActivitySource.BasicGetEmpty(queue);

activity?.SetStartTime(k.StartTime);

Expand Down
44 changes: 27 additions & 17 deletions projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ public static class RabbitMQActivitySource
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
internal const string MessageId = "messaging.message.id";
internal const string MessageConversationId = "messaging.message.conversation_id";
internal const string MessagingOperation = "messaging.operation";
internal const string MessagingOperationName = "messaging.operation.name";
internal const string MessagingOperationNameBasicDeliver = "deliver";
internal const string MessagingOperationNameBasicGet = "fetch";
internal const string MessagingOperationNameBasicGetEmpty = "fetch (empty)";
internal const string MessagingOperationNameBasicPublish = "publish";
internal const string MessagingOperationType = "messaging.operation.type";
internal const string MessagingOperationTypeSend = "send";
internal const string MessagingOperationTypeProcess = "process";
internal const string MessagingOperationTypeReceive = "receive";
internal const string MessagingSystem = "messaging.system";
internal const string MessagingDestination = "messaging.destination.name";
internal const string MessagingDestinationRoutingKey = "messaging.rabbitmq.destination.routing_key";
Expand Down Expand Up @@ -53,7 +61,7 @@ public static class RabbitMQActivitySource
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
};

internal static Activity? Send(string routingKey, string exchange, int bodySize,
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
ActivityContext linkedContext = default)
{
if (!s_publisherSource.HasListeners())
Expand All @@ -63,41 +71,42 @@ public static class RabbitMQActivitySource

Activity? activity = linkedContext == default
? s_publisherSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
ActivityKind.Producer)
: s_publisherSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
ActivityKind.Producer, linkedContext);
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("publish", routingKey, exchange, 0, bodySize, activity);
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
}

return activity;

}

internal static Activity? ReceiveEmpty(string queue)
internal static Activity? BasicGetEmpty(string queue)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}

Activity? activity = s_subscriberSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive",
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
ActivityKind.Consumer);
if (activity != null && activity.IsAllDataRequested)
{
activity
.SetTag(MessagingOperation, "receive")
.SetTag(MessagingOperationType, MessagingOperationTypeReceive)
.SetTag(MessagingOperationName, MessagingOperationNameBasicGetEmpty)
.SetTag(MessagingDestination, "amq.default");
}

return activity;
}

internal static Activity? Receive(string routingKey, string exchange, ulong deliveryTag,
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
Expand All @@ -107,11 +116,11 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} receive" : "receive", ActivityKind.Consumer,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
ContextExtractor(readOnlyBasicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties,
PopulateMessagingTags(MessagingOperationTypeReceive, MessagingOperationNameBasicGet, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
bodySize, activity);
}

Expand All @@ -128,11 +137,11 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver",
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
ActivityKind.Consumer, ContextExtractor(basicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("deliver", routingKey, exchange,
PopulateMessagingTags(MessagingOperationTypeProcess, MessagingOperationNameBasicDeliver, routingKey, exchange,
deliveryTag, basicProperties, bodySize, activity);
}

Expand All @@ -154,10 +163,10 @@ public static class RabbitMQActivitySource
?.Start();
}

private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity)
{
PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity);
PopulateMessagingTags(operationType, operationName, routingKey, exchange, deliveryTag, bodySize, activity);

if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId))
{
Expand All @@ -170,11 +179,12 @@ private static void PopulateMessagingTags(string operation, string routingKey, s
}
}

private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
ulong deliveryTag, int bodySize, Activity activity)
{
activity
.SetTag(MessagingOperation, operation)
.SetTag(MessagingOperationType, operationType)
.SetTag(MessagingOperationName, operationName)
.SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange)
.SetTag(MessagingDestinationRoutingKey, routingKey)
.SetTag(MessagingBodySize, bodySize);
Expand Down
6 changes: 3 additions & 3 deletions projects/Test/SequentialIntegration/TestActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
List<Activity> activityList, bool isDeliver = false)
{
string childName = isDeliver ? "deliver" : "receive";
string childName = isDeliver ? "deliver" : "fetch";
Activity[] activities = activityList.ToArray();
Assert.NotEmpty(activities);

Expand All @@ -418,11 +418,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
}

Activity sendActivity = activities.First(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} publish" : "publish") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
routingKeyTag == $"{queueName}");
Activity receiveActivity = activities.Single(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
x.Links.First().Context.TraceId == sendActivity.TraceId);
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
Expand Down
11 changes: 8 additions & 3 deletions projects/Test/SequentialIntegration/TestOpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
{
string childName = isDeliver ? "deliver" : "receive";
string childName = isDeliver ? "deliver" : "fetch";
string childType = isDeliver ? "process" : "receive";
Activity[] activities = activityList.ToArray();
Assert.NotEmpty(activities);
foreach (var item in activities)
Expand All @@ -354,11 +355,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
}

Activity sendActivity = activities.First(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} publish" : "publish") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
routingKeyTag == $"{queueName}");
Activity receiveActivity = activities.Single(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
x.Links.First().Context.TraceId == sendActivity.TraceId);
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
Expand All @@ -380,6 +381,10 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationType, childType);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName);
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");
}
}
}
Loading