Skip to content

Commit

Permalink
fix(pubsub): add attributes before startSpan
Browse files Browse the repository at this point in the history
Attributes can be used for sampling and so they should be added before
span creation, if possible. See [1].

[1] https://opentelemetry.io/docs/concepts/signals/traces/#attributes
  • Loading branch information
jameshartig committed Aug 30, 2024
1 parent 633dc86 commit bc476e4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
53 changes: 35 additions & 18 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
if m.Attributes != nil {
ctx = propagation.TraceContext{}.Extract(ctx, newMessageCarrier(m))
}
attr := getSubscriberOpts(it.projectID, it.subID, m)
_, span := startSpan(ctx, subscribeSpanName, it.subID, attr...)
span.SetAttributes(
attribute.Bool(eosAttribute, it.enableExactlyOnceDelivery),
attribute.String(ackIDAttribute, ackID),
semconv.MessagingBatchMessageCount(len(msgs)),
semconv.CodeFunction("receive"),
opts := getSubscriberOpts(it.projectID, it.subID, m)
opts = append(
opts,
trace.WithAttributes(
attribute.Bool(eosAttribute, it.enableExactlyOnceDelivery),
attribute.String(ackIDAttribute, ackID),
semconv.MessagingBatchMessageCount(len(msgs)),
semconv.CodeFunction("receive"),
),
)
_, span := startSpan(ctx, subscribeSpanName, it.subID, opts...)
// Always store the subscribe span, even if sampling isn't enabled.
// This is useful since we need to propagate the sampling flag
// to the callback in Receive, so traces have an unbroken sampling decision.
Expand Down Expand Up @@ -658,11 +661,16 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// Create the single ack span for this request, and for each
// message, add Subscribe<->Ack links.
opts := getCommonOptions(it.projectID, it.subID)
opts = append(opts, trace.WithLinks(links...))
opts = append(
opts,
trace.WithLinks(links...),
trace.WithAttributes(
semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendAck"),
),
)
_, ackSpan := startSpan(context.Background(), ackSpanName, it.subID, opts...)
defer ackSpan.End()
ackSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendAck"))
if ackSpan.SpanContext().IsSampled() {
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
Expand Down Expand Up @@ -740,16 +748,25 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
// Create the single modack/nack span for this request, and for each
// message, add Subscribe<->Modack links.
opts := getCommonOptions(it.projectID, it.subID)
opts = append(opts, trace.WithLinks(links...))
_, mSpan := startSpan(context.Background(), spanName, it.subID, opts...)
defer mSpan.End()
opts = append(
opts,
trace.WithLinks(links...),
trace.WithAttributes(
semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendModAck"),
),
)
if !isNack {
mSpan.SetAttributes(
semconv.MessagingGCPPubsubMessageAckDeadline(int(deadlineSec)),
attribute.Bool(receiptModackAttribute, isReceipt))
opts = append(
opts,
trace.WithAttributes(
semconv.MessagingGCPPubsubMessageAckDeadline(int(deadlineSec)),
attribute.Bool(receiptModackAttribute, isReceipt),
),
)
}
mSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendModAck"))
_, mSpan := startSpan(context.Background(), spanName, it.subID, opts...)
defer mSpan.End()
if mSpan.SpanContext().IsSampled() {
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
Expand Down
10 changes: 8 additions & 2 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
var createSpan trace.Span
if t.enableTracing {
opts := getPublishSpanAttributes(t.c.projectID, t.ID(), msg)
opts = append(opts, trace.WithAttributes(semconv.CodeFunction("Publish")))
ctx, createSpan = startSpan(ctx, createSpanName, t.ID(), opts...)
createSpan.SetAttributes(semconv.CodeFunction("Publish"))
}
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
Expand Down Expand Up @@ -973,8 +973,14 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
opts := getCommonOptions(projectID, topicID)
// Add link to publish RPC span of createSpan(s).
opts = append(opts, trace.WithLinks(links...))
opts = append(
opts,
trace.WithAttributes(
semconv.MessagingBatchMessageCount(numMsgs),
semconv.CodeFunction("publishMessageBundle"),
),
)
ctx, pSpan = startSpan(ctx, publishRPCSpanName, topicID, opts...)
pSpan.SetAttributes(semconv.MessagingBatchMessageCount(numMsgs), semconv.CodeFunction("publishMessageBundle"))
defer pSpan.End()

// Add the reverse link to createSpan(s) of publish RPC span.
Expand Down

0 comments on commit bc476e4

Please sign in to comment.