Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use semconv v1.26.0 #42

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions acknowledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"github.com/rabbitmq/amqp091-go"
Expand All @@ -19,9 +20,9 @@ type acknowledger struct {
func (a *acknowledger) Ack(tag uint64, multiple bool) error {
err := a.acker.Ack(tag, multiple)
if multiple {
a.endMultiple(tag, codes.Ok, "", err)
a.endMultiple(tag, codes.Ok, "ack", err)
} else {
a.endOne(tag, codes.Ok, "", err)
a.endOne(tag, codes.Ok, "ack", err)
}
return err
}
Expand All @@ -47,6 +48,7 @@ func (a *acknowledger) endMultiple(lastTag uint64, code codes.Code, desc string,
defer a.ch.m.Unlock()

for tag, span := range a.ch.spanMap {
span.SetAttributes(semconv.MessagingOperationName(desc))
if tag <= lastTag {
if err != nil {
span.RecordError(err)
Expand All @@ -62,6 +64,7 @@ func (a *acknowledger) endOne(tag uint64, code codes.Code, desc string, err erro
a.ch.m.Lock()
defer a.ch.m.Unlock()

a.span.SetAttributes(semconv.MessagingOperationName(desc))
if err != nil {
a.span.RecordError(err)
}
Expand Down
15 changes: 7 additions & 8 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"github.com/rabbitmq/amqp091-go"
)

const (
netProtocolVer = "0.9.1"
messageSystem = "rabbitmq"
)

func queueAnonymous(queue string) bool {
Expand Down Expand Up @@ -87,12 +86,11 @@
attrs := []attribute.KeyValue{
operation,
semconv.MessagingDestinationAnonymous(queueAnonymous(queue)),
// fixme messaging.destination.name MUST be set to the name of the exchange.
semconv.MessagingDestinationName(queue),
semconv.MessagingDestinationPublishAnonymous(msg.Exchange == ""),
semconv.MessagingDestinationPublishName(msg.Exchange),
// todo messaging.client.id
semconv.MessagingRabbitmqDestinationRoutingKey(msg.RoutingKey),
// todo messaging.client_id
}
if msg.MessageCount != 0 {
attrs = append(attrs, semconv.MessagingBatchMessageCount(int(msg.MessageCount)))
Expand Down Expand Up @@ -136,7 +134,7 @@
newDeliveries := make(chan amqp091.Delivery)
go func() {
for msg := range deliveries {
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationDeliver)
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationTypeDeliver)

Check warning on line 137 in channel.go

View check run for this annotation

Codecov / codecov/patch

channel.go#L137

Added line #L137 was not covered by tests
newDeliveries <- msg
}
close(newDeliveries)
Expand All @@ -156,11 +154,12 @@
) (*amqp091.DeferredConfirmation, error) {
// Create a span.
attrs := []attribute.KeyValue{
semconv.MessagingOperationPublish,
semconv.MessagingOperationTypePublish,
semconv.MessagingOperationName("send"),

Check warning on line 158 in channel.go

View check run for this annotation

Codecov / codecov/patch

channel.go#L157-L158

Added lines #L157 - L158 were not covered by tests
semconv.MessagingDestinationAnonymous(exchange == ""),
semconv.MessagingDestinationName(exchange),
// todo messaging.client.id
semconv.MessagingRabbitmqDestinationRoutingKey(key),
// todo messaging.client_id
}
if msg.CorrelationId != "" {
attrs = append(attrs, semconv.MessagingMessageConversationID(msg.CorrelationId))
Expand Down Expand Up @@ -194,6 +193,6 @@
if err != nil || !ok {
return
}
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationReceive)
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationTypeReceive)

Check warning on line 196 in channel.go

View check run for this annotation

Codecov / codecov/patch

channel.go#L196

Added line #L196 was not covered by tests
return msg, ok, err
}