Skip to content

Commit

Permalink
feat: use semconv v1.26.0 (#42)
Browse files Browse the repository at this point in the history
Signed-off-by: Cattī Crūdēlēs <[email protected]>
  • Loading branch information
wzy9607 authored Jul 6, 2024
1 parent 01333ba commit 59afe92
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
7 changes: 5 additions & 2 deletions acknowledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"github.com/rabbitmq/amqp091-go"
Expand All @@ -19,9 +20,9 @@ type acknowledger struct {
func (a *acknowledger) Ack(tag uint64, multiple bool) error {
err := a.acker.Ack(tag, multiple)
if multiple {
a.endMultiple(tag, codes.Ok, "", err)
a.endMultiple(tag, codes.Ok, "ack", err)
} else {
a.endOne(tag, codes.Ok, "", err)
a.endOne(tag, codes.Ok, "ack", err)
}
return err
}
Expand All @@ -47,6 +48,7 @@ func (a *acknowledger) endMultiple(lastTag uint64, code codes.Code, desc string,
defer a.ch.m.Unlock()

for tag, span := range a.ch.spanMap {
span.SetAttributes(semconv.MessagingOperationName(desc))
if tag <= lastTag {
if err != nil {
span.RecordError(err)
Expand All @@ -62,6 +64,7 @@ func (a *acknowledger) endOne(tag uint64, code codes.Code, desc string, err erro
a.ch.m.Lock()
defer a.ch.m.Unlock()

a.span.SetAttributes(semconv.MessagingOperationName(desc))
if err != nil {
a.span.RecordError(err)
}
Expand Down
15 changes: 7 additions & 8 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"

"github.com/rabbitmq/amqp091-go"
)

const (
netProtocolVer = "0.9.1"
messageSystem = "rabbitmq"
)

func queueAnonymous(queue string) bool {
Expand Down Expand Up @@ -87,12 +86,11 @@ func (ch *Channel) startConsumerSpan(msg *amqp091.Delivery, queue string, operat
attrs := []attribute.KeyValue{
operation,
semconv.MessagingDestinationAnonymous(queueAnonymous(queue)),
// fixme messaging.destination.name MUST be set to the name of the exchange.
semconv.MessagingDestinationName(queue),
semconv.MessagingDestinationPublishAnonymous(msg.Exchange == ""),
semconv.MessagingDestinationPublishName(msg.Exchange),
// todo messaging.client.id
semconv.MessagingRabbitmqDestinationRoutingKey(msg.RoutingKey),
// todo messaging.client_id
}
if msg.MessageCount != 0 {
attrs = append(attrs, semconv.MessagingBatchMessageCount(int(msg.MessageCount)))
Expand Down Expand Up @@ -136,7 +134,7 @@ func (ch *Channel) Consume(
newDeliveries := make(chan amqp091.Delivery)
go func() {
for msg := range deliveries {
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationDeliver)
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationTypeDeliver)
newDeliveries <- msg
}
close(newDeliveries)
Expand All @@ -156,11 +154,12 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(
) (*amqp091.DeferredConfirmation, error) {
// Create a span.
attrs := []attribute.KeyValue{
semconv.MessagingOperationPublish,
semconv.MessagingOperationTypePublish,
semconv.MessagingOperationName("send"),
semconv.MessagingDestinationAnonymous(exchange == ""),
semconv.MessagingDestinationName(exchange),
// todo messaging.client.id
semconv.MessagingRabbitmqDestinationRoutingKey(key),
// todo messaging.client_id
}
if msg.CorrelationId != "" {
attrs = append(attrs, semconv.MessagingMessageConversationID(msg.CorrelationId))
Expand Down Expand Up @@ -194,6 +193,6 @@ func (ch *Channel) Get(queue string, autoAck bool) (msg amqp091.Delivery, ok boo
if err != nil || !ok {
return
}
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationReceive)
ch.startConsumerSpan(&msg, queue, semconv.MessagingOperationTypeReceive)
return msg, ok, err
}

0 comments on commit 59afe92

Please sign in to comment.