diff --git a/go.work b/go.work index 5a8408363b5b..f85458707d61 100644 --- a/go.work +++ b/go.work @@ -1,5 +1,7 @@ go 1.21 +toolchain go1.21.0 + use ( . ./accessapproval diff --git a/go.work.sum b/go.work.sum index 9c215e26948d..674c71e359fc 100644 --- a/go.work.sum +++ b/go.work.sum @@ -4,12 +4,9 @@ cloud.google.com/go/dataproc v1.12.0 h1:W47qHL3W4BPkAIbk4SWmIERwsWBaNnWm0P2sdx3Y cloud.google.com/go/gaming v1.9.0 h1:7vEhFnZmd931Mo7sZ6pJy7uQPDxF7m7v8xtBheG08tc= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.18.0 h1:ugYJK/neZQtQeh2jc5xNoDFiMQojlAkoqJMRb7vTu1U= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.18.0/go.mod h1:Xx0VKh7GJ4si3rmElbh19Mejxz68ibWg/J30ZOMrqzU= -github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.46.0/go.mod h1:V28hx+cUCZC9e3qcqszMb+Sbt8cQZtHTiXOmyDzoDOg= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM= -github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= -github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aws/aws-sdk-go-v2 v1.16.10/go.mod h1:WTACcleLz6VZTp7fak4EO5b9Q4foxbn+8PIz3PmyKlo= github.com/aws/aws-sdk-go-v2/config v1.15.9/go.mod h1:rv/l/TbZo67kp99v/3Kb0qV6Fm1KEtKyruEV2GvVfgs= github.com/aws/aws-sdk-go-v2/credentials v1.12.12/go.mod h1:vFHC2HifIWHebmoVsfpqliKuqbAY2LaVlvy03JzF4c4= @@ -29,10 +26,7 @@ github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89/go.mod h1:GKljq0V github.com/chromedp/chromedp v0.9.2/go.mod h1:LkSXJKONWTCHAfQasKFUZI+mxqS4tZqhmtGzzhLsnLs= github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww= github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk= -github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/fullstorydev/grpcurl v1.8.7/go.mod h1:pVtM4qe3CMoLaIzYS8uvTuDj2jVYmXqMUkZeijnXp/E= -github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8= -github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= @@ -55,15 +49,14 @@ github.com/miekg/dns v1.1.33/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7 github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw= go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= go.opentelemetry.io/otel/bridge/opencensus v0.40.0 h1:pqDiayRhBgoqy1vwnscik+TizcImJ58l053NScJyZso= go.opentelemetry.io/otel/bridge/opencensus v0.40.0/go.mod h1:1NvVHb6tLTe5A9qCYz+eErW0t8iPn4ZfR6tDKcqlGTM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk= @@ -79,8 +72,6 @@ golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= google.golang.org/api v0.174.0/go.mod h1:aC7tB6j0HR1Nl0ni5ghpx6iLasmAX78Zkh/wgxAAjLg= -google.golang.org/api v0.185.0 h1:ENEKk1k4jW8SmmaT6RE+ZasxmxezCrD5Vw4npvr+pAU= -google.golang.org/api v0.185.0/go.mod h1:HNfvIkJGlgrIlrbYkAm9W9IdkmKZjOTVh33YltygGbg= google.golang.org/genproto v0.0.0-20230725213213-b022f6e96895/go.mod h1:0ggbjUrZYpy1q+ANUS30SEoGZ53cdfwtbuG7Ptgy108= google.golang.org/genproto/googleapis/api v0.0.0-20230725213213-b022f6e96895/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:RGnPtTG7r4i8sPlNyDeikXF99hMM+hN6QMm4ooG9g2g= diff --git a/pubsub/go.mod b/pubsub/go.mod index e9e1d6e7899f..7b103fa684fe 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -2,6 +2,8 @@ module cloud.google.com/go/pubsub go 1.21 +toolchain go1.21.0 + require ( cloud.google.com/go v0.115.0 cloud.google.com/go/iam v1.1.12 @@ -11,6 +13,9 @@ require ( github.com/googleapis/gax-go/v2 v2.13.0 go.einride.tech/aip v0.67.1 go.opencensus.io v0.24.0 + go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 golang.org/x/oauth2 v0.22.0 golang.org/x/sync v0.8.0 golang.org/x/time v0.6.0 @@ -34,10 +39,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sys v0.22.0 // indirect diff --git a/pubsub/go.sum b/pubsub/go.sum index da49ebbdd3e3..f36210a2675b 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -83,14 +83,14 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= diff --git a/pubsub/iterator.go b/pubsub/iterator.go index e06d52ded4af..9f60359040fb 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -29,6 +29,10 @@ import ( "cloud.google.com/go/pubsub/internal/distribution" gax "github.com/googleapis/gax-go/v2" "github.com/googleapis/gax-go/v2/apierror" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -67,6 +71,8 @@ type messageIterator struct { po *pullOptions ps *pullStream subc *vkit.SubscriberClient + projectID string + subID string subName string kaTick <-chan time.Time // keep-alive (deadline extensions) ackTicker *time.Ticker // message acks @@ -106,6 +112,13 @@ type messageIterator struct { // by the response in StreamingPull and can change mid Receive. Must be accessed // with the lock held. enableOrdering bool + + // enableTracing enables span creation for this subscriber iterator. + enableTracing bool + // This maps trace ackID (string) to root subscribe spans(trace.Span), used for otel tracing. + // Active ackIDs in this map should also exist 1:1 with ids in keepAliveDeadlines. + // Elements are removed when messages are acked, nacked, or expired in iterator.handleKeepAlives() + activeSpans sync.Map } // newMessageIterator starts and returns a new messageIterator. @@ -134,12 +147,17 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt receiptTicker := time.NewTicker(100 * time.Millisecond) cctx, cancel := context.WithCancel(context.Background()) cctx = withSubscriptionKey(cctx, subName) + + projectID, subID := parseResourceName(subName) + it := &messageIterator{ ctx: cctx, cancel: cancel, ps: ps, po: po, subc: subc, + projectID: projectID, + subID: subID, subName: subName, kaTick: time.After(keepAlivePeriod), ackTicker: ackTicker, @@ -269,7 +287,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { if err != nil { return nil, it.fail(err) } + recordStat(it.ctx, PullCount, int64(len(rmsgs))) + now := time.Now() msgs, err := convertMessages(rmsgs, now, it.done) if err != nil { @@ -309,6 +329,25 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { pendingMessages[ackID] = m } } + + if it.enableTracing { + ctx := context.Background() + if m.Attributes != nil { + ctx = propagation.TraceContext{}.Extract(ctx, newMessageCarrier(m)) + } + attr := getSubscriberOpts(it.projectID, it.subID, m) + _, span := startSpan(ctx, subscribeSpanName, it.subID, attr...) + span.SetAttributes( + attribute.Bool(eosAttribute, it.enableExactlyOnceDelivery), + attribute.String(ackIDAttribute, ackID), + semconv.MessagingBatchMessageCount(len(msgs)), + semconv.CodeFunction("receive"), + ) + // Always store the subscribe span, even if sampling isn't enabled. + // This is useful since we need to propagate the sampling flag + // to the callback in Receive, so traces have an unbroken sampling decision. + it.activeSpans.Store(ackID, span) + } } deadline := it.ackDeadline() it.mu.Unlock() @@ -328,7 +367,7 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // If exactly once is enabled, we should wait until modack responses are successes // before attempting to process messages. - it.sendModAck(ackIDs, deadline, false) + it.sendModAck(ackIDs, deadline, false, true) for ackID, ar := range ackIDs { ctx := context.Background() _, err := ar.Get(ctx) @@ -498,16 +537,16 @@ func (it *messageIterator) sender() { } if sendNacks { // Nack indicated by modifying the deadline to zero. - it.sendModAck(nacks, 0, false) + it.sendModAck(nacks, 0, false, false) } if sendModAcks { - it.sendModAck(modAcks, dl, true) + it.sendModAck(modAcks, dl, true, false) } if sendPing { it.pingStream() } if sendReceipt { - it.sendModAck(receipts, dl, true) + it.sendModAck(receipts, dl, true, true) } } } @@ -520,11 +559,23 @@ func (it *messageIterator) handleKeepAlives() { now := time.Now() for id, expiry := range it.keepAliveDeadlines { if expiry.Before(now) { + // Message is now expired. // This delete will not result in skipping any map items, as implied by // the spec at https://golang.org/ref/spec#For_statements, "For // statements with range clause", note 3, and stated explicitly at // https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ. delete(it.keepAliveDeadlines, id) + if it.enableTracing { + // get the parent span context for this ackID for otel tracing. + // This message is now expired, so if the ackID is still valid, + // mark that span as expired and end the span. + s, ok := it.activeSpans.LoadAndDelete(id) + if ok { + span := s.(trace.Span) + span.SetAttributes(attribute.String(resultAttribute, resultExpired)) + span.End() + } + } } else { // Use a success AckResult since we don't propagate ModAcks back to the user. it.pendingModAcks[id] = newSuccessAckResult() @@ -539,8 +590,8 @@ type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult) func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) { ackIDs := make([]string, 0, len(m)) - for k := range m { - ackIDs = append(ackIDs, k) + for ackID := range m { + ackIDs = append(ackIDs, ackID) } it.eoMu.RLock() exactlyOnceDelivery := it.enableExactlyOnceDelivery @@ -563,11 +614,10 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF for _, ackID := range toSend { resultsByAckID[ackID] = m[ackID] } - st, md := extractMetadata(err) _, toRetry := processResults(st, resultsByAckID, md) if len(toRetry) > 0 { - // Retry modacks/nacks in a separate goroutine. + // Retry acks/modacks/nacks in a separate goroutine. go func() { retryAckFunc(toRetry) }() @@ -581,14 +631,57 @@ func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackF // sendAck is used to confirm acknowledgement of a message. If exactly once delivery is // enabled, we'll retry these messages for a short duration in a goroutine. func (it *messageIterator) sendAck(m map[string]*AckResult) { - it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error { + it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error { + // For each ackID (message), setup links to the main subscribe span. + // If this is a nack, also remove it from active spans. + // If the ackID is not found, don't create any more spans. + if it.enableTracing { + var links []trace.Link + subscribeSpans := make([]trace.Span, 0, len(ackIDs)) + for _, ackID := range ackIDs { + // get the main subscribe span context for this ackID for otel tracing. + s, ok := it.activeSpans.LoadAndDelete(ackID) + if ok { + subscribeSpan := s.(trace.Span) + defer subscribeSpan.End() + defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultAcked)) + subscribeSpans = append(subscribeSpans, subscribeSpan) + subscribeSpan.AddEvent(eventAckStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)))) + defer subscribeSpan.AddEvent(eventAckEnd) + // Only add this link if the span is sampled, otherwise we're creating invalid links. + if subscribeSpan.SpanContext().IsSampled() { + links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()}) + } + } + } + + // Create the single ack span for this request, and for each + // message, add Subscribe<->Ack links. + opts := getCommonOptions(it.projectID, it.subID) + opts = append(opts, trace.WithLinks(links...)) + _, ackSpan := startSpan(context.Background(), ackSpanName, it.subID, opts...) + defer ackSpan.End() + ackSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)), + semconv.CodeFunction("sendAck")) + if ackSpan.SpanContext().IsSampled() { + for _, s := range subscribeSpans { + s.AddLink(trace.Link{ + SpanContext: ackSpan.SpanContext(), + Attributes: []attribute.KeyValue{ + semconv.MessagingOperationName(ackSpanName), + }, + }) + } + } + } return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{ Subscription: it.subName, - AckIds: ackIds, + AckIds: ackIDs, }) }, it.retryAcks, func(ctx context.Context, toSend []string) { recordStat(it.ctx, AckCount, int64(len(toSend))) addAcks(toSend) + }) } @@ -598,13 +691,80 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { // percentile in order to capture the highest amount of time necessary without // considering 1% outliers. If the ModAck RPC fails and exactly once delivery is // enabled, we retry it in a separate goroutine for a short duration. -func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) { +func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid, isReceipt bool) { deadlineSec := int32(deadline / time.Second) - it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error { + isNack := deadline == 0 + var spanName, eventStart, eventEnd string + if isNack { + spanName = nackSpanName + eventStart = eventNackStart + eventEnd = eventNackEnd + } else { + spanName = modackSpanName + eventStart = eventModackStart + eventEnd = eventModackEnd + } + it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error { + if it.enableTracing { + // For each ackID (message), link back to the main subscribe span. + // If this is a nack, also remove it from active spans. + // If the ackID is not found, don't create any more spans. + links := make([]trace.Link, 0, len(ackIDs)) + subscribeSpans := make([]trace.Span, 0, len(ackIDs)) + for _, ackID := range ackIDs { + // get the parent span context for this ackID for otel tracing. + var s any + var ok bool + if isNack { + s, ok = it.activeSpans.LoadAndDelete(ackID) + } else { + s, ok = it.activeSpans.Load(ackID) + } + if ok { + subscribeSpan := s.(trace.Span) + subscribeSpans = append(subscribeSpans, subscribeSpan) + if isNack { + defer subscribeSpan.End() + defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultNacked)) + } + subscribeSpan.AddEvent(eventStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)))) + defer subscribeSpan.AddEvent(eventEnd) + + // Only add this link if the span is sampled, otherwise we're creating invalid links. + if subscribeSpan.SpanContext().IsSampled() { + links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()}) + } + } + } + + // Create the single modack/nack span for this request, and for each + // message, add Subscribe<->Modack links. + opts := getCommonOptions(it.projectID, it.subID) + opts = append(opts, trace.WithLinks(links...)) + _, mSpan := startSpan(context.Background(), spanName, it.subID, opts...) + defer mSpan.End() + if !isNack { + mSpan.SetAttributes( + semconv.MessagingGCPPubsubMessageAckDeadline(int(deadlineSec)), + attribute.Bool(receiptModackAttribute, isReceipt)) + } + mSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)), + semconv.CodeFunction("sendModAck")) + if mSpan.SpanContext().IsSampled() { + for _, s := range subscribeSpans { + s.AddLink(trace.Link{ + SpanContext: mSpan.SpanContext(), + Attributes: []attribute.KeyValue{ + semconv.MessagingOperationName(spanName), + }, + }) + } + } + } return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{ Subscription: it.subName, AckDeadlineSeconds: deadlineSec, - AckIds: ackIds, + AckIds: ackIDs, }) }, func(toRetry map[string]*ipubsub.AckResult) { it.retryModAcks(toRetry, deadlineSec, logOnInvalid) diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index 64e12edb37ce..6eccb0f2211c 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -549,7 +549,7 @@ func TestIterator_StreamingPullExactlyOnce(t *testing.T) { func TestAddToDistribution(t *testing.T) { c, _ := newFake(t) - iter := newMessageIterator(c.subc, "some-sub", &pullOptions{}) + iter := newMessageIterator(c.subc, "projects/p/subscriptions/some-sub", &pullOptions{}) // Start with a datapoint that's too small that should be bounded to 10s. receiveTime := time.Now().Add(time.Duration(-1) * time.Second) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index e8250a3dbfce..d4124bcb952a 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -49,15 +49,24 @@ const ( // Clients should be reused rather than being created as needed. // A Client may be shared by multiple goroutines. type Client struct { - projectID string - pubc *vkit.PublisherClient - subc *vkit.SubscriberClient + projectID string + pubc *vkit.PublisherClient + subc *vkit.SubscriberClient + enableTracing bool } // ClientConfig has configurations for the client. type ClientConfig struct { PublisherCallOptions *vkit.PublisherCallOptions SubscriberCallOptions *vkit.SubscriberCallOptions + + // EnableOpenTelemetryTracing enables tracing for this client. + // This option allows selectively disabling Pub/Sub traces. + // This defaults to false. + // OpenTelemetry tracing standards are in active development, and thus + // attributes, links, and span names are EXPERIMENTAL and subject to + // change or removal without notice. + EnableOpenTelemetryTracing bool } // mergePublisherCallOptions merges two PublisherCallOptions into one and the first argument has @@ -136,7 +145,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio } // NewClientWithConfig creates a new PubSub client. -func NewClientWithConfig(ctx context.Context, projectID string, config *ClientConfig, opts ...option.ClientOption) (c *Client, err error) { +func NewClientWithConfig(ctx context.Context, projectID string, config *ClientConfig, opts ...option.ClientOption) (*Client, error) { if projectID == "" { return nil, ErrEmptyProjectID } @@ -187,11 +196,15 @@ func NewClientWithConfig(ctx context.Context, projectID string, config *ClientCo return nil, err } - return &Client{ + c := &Client{ projectID: projectID, pubc: pubc, subc: subc, - }, nil + } + if config != nil { + c.enableTracing = config.EnableOpenTelemetryTracing + } + return c, nil } // Project returns the project ID or number for this instance of the client, which may have diff --git a/pubsub/service.go b/pubsub/service.go index 928a77d3b1c5..71d541a94b0d 100644 --- a/pubsub/service.go +++ b/pubsub/service.go @@ -130,3 +130,14 @@ func newExactlyOnceBackoff() gax.Backoff { Multiplier: 2, } } + +// parseResourceName parses the project and resource ID from a fully qualified name. +// For example, "projects/p/topics/my-topic" -> "p", "my-topic". +// Returns empty strings if the input is misformatted. +func parseResourceName(fqn string) (string, string) { + s := strings.Split(fqn, "/") + if len(s) != 4 { + return "", "" + } + return s[1], s[len(s)-1] +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 51e5cf8a0ddf..c6311d6e933b 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -25,10 +25,13 @@ import ( "cloud.google.com/go/iam" "cloud.google.com/go/internal/optional" + ipubsub "cloud.google.com/go/internal/pubsub" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/internal/scheduler" "github.com/google/uuid" gax "github.com/googleapis/gax-go/v2" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -56,6 +59,10 @@ type Subscription struct { // This indicates to the server that any guarantees made for a stream that // disconnected will be made for the stream that is created to replace it. clientID string + // enableTracing enable otel tracing of Pub/Sub messages on this subscription. + // This is configured at client instantiation, and allows + // disabling of tracing even when a tracer provider is detected. + enableTracing bool } // Subscription creates a reference to a subscription. @@ -65,10 +72,16 @@ func (c *Client) Subscription(id string) *Subscription { // SubscriptionInProject creates a reference to a subscription in a given project. func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { + return newSubscription(c, fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id)) +} + +func newSubscription(c *Client, name string) *Subscription { return &Subscription{ - c: c, - name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), - clientID: uuid.NewString(), + c: c, + name: name, + clientID: uuid.NewString(), + ReceiveSettings: DefaultReceiveSettings, + enableTracing: c.enableTracing, } } @@ -118,7 +131,7 @@ func (subs *SubscriptionIterator) Next() (*Subscription, error) { if err != nil { return nil, err } - return &Subscription{c: subs.c, name: subName}, nil + return newSubscription(subs.c, subName), nil } // NextConfig returns the next subscription config. If there are no more subscriptions, @@ -1319,6 +1332,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // canceling that context would immediately stop the iterator without // waiting for unacked messages. iter := newMessageIterator(s.c.subc, s.name, po) + iter.enableTracing = s.enableTracing // We cannot use errgroup from Receive here. Receive might already be // calling group.Wait, and group.Wait cannot be called concurrently with @@ -1363,6 +1377,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return nil default: } + msgs, err := iter.receive(maxToPull) if err == io.EOF { return nil @@ -1380,9 +1395,34 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return nil default: } + for i, msg := range msgs { msg := msg - // TODO(jba): call acquire closer to when the message is allocated. + iter.eoMu.RLock() + ackh, _ := msgAckHandler(msg, iter.enableExactlyOnceDelivery) + iter.eoMu.RUnlock() + // otelCtx is used to store the main subscribe span to the other child spans. + // We want this to derive from the main subscribe ctx, so the iterator remains + // cancellable. + // We cannot reassign into ctx2 directly since this ctx should be different per + // batch of messages and also per message iterator. + otelCtx := ctx2 + // Stores the concurrency control span, which starts before the call to + // acquire is made, and ends immediately after. This used to be called + // flow control, but is more accurately describes as concurrency control + // since this limits the number of simultaneous callback invocations. + var ccSpan trace.Span + if iter.enableTracing { + c, ok := iter.activeSpans.Load(ackh.ackID) + if ok { + sc := c.(trace.Span) + otelCtx = trace.ContextWithSpanContext(otelCtx, sc.SpanContext()) + // Don't override otelCtx here since the parent of subsequent spans + // should be the subscribe span still. + _, ccSpan = startSpan(otelCtx, ccSpanName, "") + } + } + // Use the original user defined ctx for this operation so the acquire operation can be cancelled. if err := fc.acquire(ctx, len(msg.Data)); err != nil { // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. for _, m := range msgs[i:] { @@ -1391,9 +1431,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes // Return nil if the context is done, not err. return nil } - iter.eoMu.RLock() - msgAckHandler(msg, iter.enableExactlyOnceDelivery) - iter.eoMu.RUnlock() + if iter.enableTracing { + ccSpan.End() + } wg.Add(1) // Only schedule messages in order if an ordering key is present and the subscriber client @@ -1403,14 +1443,44 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes if iter.enableOrdering { key = msg.OrderingKey } + // TODO(deklerk): Can we have a generic handler at the + // constructor level? + var schedulerSpan trace.Span + if iter.enableTracing { + _, schedulerSpan = startSpan(otelCtx, scheduleSpanName, "") + } iter.orderingMu.RUnlock() msgLen := len(msg.Data) if err := sched.Add(key, msg, func(msg interface{}) { + m := msg.(*Message) defer wg.Done() + var ps trace.Span + if iter.enableTracing { + schedulerSpan.End() + // Start the process span, and augment the done function to end this span and record events. + otelCtx, ps = startSpan(otelCtx, processSpanName, s.ID()) + old := ackh.doneFunc + ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) { + var eventString string + if ack { + eventString = eventAckCalled + } else { + eventString = eventNackCalled + } + ps.AddEvent(eventString) + // This is the process operation, but is currently named "Deliver". Replace once + // updated here: https://github.com/open-telemetry/opentelemetry-go/blob/eb6bd28f3288b173d148c67f9ed45390594abdc2/semconv/v1.26.0/attribute_group.go#L5240 + ps.SetAttributes(semconv.MessagingOperationTypeDeliver) + ps.End() + old(ackID, ack, r, receiveTime) + } + } defer fc.release(ctx, msgLen) - f(ctx2, msg.(*Message)) + f(otelCtx, m) }); err != nil { wg.Done() + // TODO(hongalex): propagate these errors to an otel span. + // If there are any errors with scheduling messages, // nack them so they can be redelivered. msg.Nack() diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index e98f5c9f0f7d..d2cdfe3f86fb 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -350,7 +350,9 @@ func newFake(t *testing.T) (*Client, *pstest.Server) { client, err := NewClient(ctx, projName, option.WithEndpoint(srv.Addr), option.WithoutAuthentication(), - option.WithGRPCDialOption(grpc.WithInsecure())) + option.WithGRPCDialOption(grpc.WithInsecure()), + option.WithTelemetryDisabled(), + ) if err != nil { t.Fatal(err) } diff --git a/pubsub/topic.go b/pubsub/topic.go index b953cb4d1bcb..1991fa7f03ce 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -33,6 +33,9 @@ import ( gax "github.com/googleapis/gax-go/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/support/bundler" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -83,6 +86,11 @@ type Topic struct { // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool + + // enableTracing enables OTel tracing of Pub/Sub messages on this topic. + // This is configured at client instantiation, and allows + // disabling tracing even when a tracer provider is detectd. + enableTracing bool } // PublishSettings control the bundling of published messages. @@ -215,6 +223,7 @@ func newTopic(c *Client, name string) *Topic { c: c, name: name, PublishSettings: DefaultPublishSettings, + enableTracing: c.enableTracing, } } @@ -736,6 +745,12 @@ var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { + var createSpan trace.Span + if t.enableTracing { + opts := getPublishSpanAttributes(t.c.projectID, t.ID(), msg) + ctx, createSpan = startSpan(ctx, createSpanName, t.ID(), opts...) + createSpan.SetAttributes(semconv.CodeFunction("Publish")) + } ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) if err != nil { log.Printf("pubsub: cannot create context with tag in Publish: %v", err) @@ -744,6 +759,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled) + spanRecordError(createSpan, errTopicOrderingNotEnabled) return r } @@ -754,25 +770,57 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { Attributes: msg.Attributes, OrderingKey: msg.OrderingKey, }) + if t.enableTracing { + createSpan.SetAttributes(semconv.MessagingMessageBodySize(len(msg.Data))) + } t.initBundler() t.mu.RLock() defer t.mu.RUnlock() if t.stopped { ipubsub.SetPublishResult(r, "", ErrTopicStopped) + spanRecordError(createSpan, ErrTopicStopped) return r } + var batcherSpan trace.Span + var fcSpan trace.Span + + if t.enableTracing { + _, fcSpan = startSpan(ctx, publishFCSpanName, "") + } if err := t.flowController.acquire(ctx, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) + spanRecordError(fcSpan, err) return r } - err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) - if err != nil { + if t.enableTracing { + fcSpan.End() + } + + _, batcherSpan = startSpan(ctx, batcherSpanName, "") + + bmsg := &bundledMessage{ + msg: msg, + res: r, + size: msgSize, + createSpan: createSpan, + } + + if t.enableTracing { + bmsg.batcherSpan = batcherSpan + + // Inject the context from the first publish span rather than from flow control / batching. + injectPropagation(ctx, msg) + } + + if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) + spanRecordError(createSpan, err) } + return r } @@ -802,6 +850,10 @@ type bundledMessage struct { msg *Message res *PublishResult size int + // createSpan is the entire publish createSpan (from user calling Publish to the publish RPC resolving). + createSpan trace.Span + // batcherSpan traces the message batching operation in publish scheduler. + batcherSpan trace.Span } func (t *Topic) initBundler() { @@ -829,14 +881,23 @@ func (t *Topic) initBundler() { } t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) { - // TODO(jba): use a context detached from the one passed to NewClient. - ctx := context.TODO() + // Use a context detached from the one passed to NewClient. + ctx := context.Background() if timeout != 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - t.publishMessageBundle(ctx, bundle.([]*bundledMessage)) + bmsgs := bundle.([]*bundledMessage) + if t.enableTracing { + for _, m := range bmsgs { + m.batcherSpan.End() + m.createSpan.AddEvent(eventPublishStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(bmsgs)))) + defer m.createSpan.End() + defer m.createSpan.AddEvent(eventPublishEnd) + } + } + t.publishMessageBundle(ctx, bmsgs) }) t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold @@ -889,11 +950,47 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) if err != nil { log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err) } - pbMsgs := make([]*pb.PubsubMessage, len(bms)) + numMsgs := len(bms) + pbMsgs := make([]*pb.PubsubMessage, numMsgs) var orderingKey string - batchSize := 0 + if numMsgs != 0 { + // extract the ordering key for this batch. since + // messages in the same batch share the same ordering + // key, it doesn't matter which we read from. + orderingKey = bms[0].msg.OrderingKey + } + + if t.enableTracing { + links := make([]trace.Link, 0, numMsgs) + for _, bm := range bms { + if bm.createSpan.SpanContext().IsSampled() { + links = append(links, trace.Link{SpanContext: bm.createSpan.SpanContext()}) + } + } + + projectID, topicID := parseResourceName(t.name) + var pSpan trace.Span + opts := getCommonOptions(projectID, topicID) + // Add link to publish RPC span of createSpan(s). + opts = append(opts, trace.WithLinks(links...)) + ctx, pSpan = startSpan(ctx, publishRPCSpanName, topicID, opts...) + pSpan.SetAttributes(semconv.MessagingBatchMessageCount(numMsgs), semconv.CodeFunction("publishMessageBundle")) + defer pSpan.End() + + // Add the reverse link to createSpan(s) of publish RPC span. + if pSpan.SpanContext().IsSampled() { + for _, bm := range bms { + bm.createSpan.AddLink(trace.Link{ + SpanContext: pSpan.SpanContext(), + Attributes: []attribute.KeyValue{ + semconv.MessagingOperationName(publishRPCSpanName), + }, + }) + } + } + } + var batchSize int for i, bm := range bms { - orderingKey = bm.msg.OrderingKey pbMsgs[i] = &pb.PubsubMessage{ Data: bm.msg.Data, Attributes: bm.msg.Attributes, @@ -902,6 +999,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) batchSize = batchSize + proto.Size(pbMsgs[i]) bm.msg = nil // release bm.msg for GC } + var res *pb.PublishResponse start := time.Now() if orderingKey != "" && t.scheduler.IsPaused(orderingKey) { @@ -942,8 +1040,12 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) t.flowController.release(ctx, bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) + spanRecordError(bm.createSpan, err) } else { ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) + if t.enableTracing { + bm.createSpan.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) + } } } } diff --git a/pubsub/trace.go b/pubsub/trace.go index cadc3eb6d50a..1d41e9d89422 100644 --- a/pubsub/trace.go +++ b/pubsub/trace.go @@ -16,12 +16,20 @@ package pubsub import ( "context" + "fmt" "log" "sync" + "cloud.google.com/go/pubsub/internal" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" ) // The following keys are used to tag requests with a specific topic/subscription ID. @@ -256,3 +264,160 @@ func withSubscriptionKey(ctx context.Context, subName string) context.Context { func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { stats.Record(ctx, m.M(n)) } + +const defaultTracerName = "cloud.google.com/go/pubsub" + +func tracer() trace.Tracer { + return otel.Tracer(defaultTracerName, trace.WithInstrumentationVersion(internal.Version)) +} + +var _ propagation.TextMapCarrier = (*messageCarrier)(nil) + +// messageCarrier injects and extracts traces from a pubsub.Message. +type messageCarrier struct { + msg *Message +} + +const googclientPrefix string = "googclient_" + +// newMessageCarrier creates a new PubsubMessageCarrier. +func newMessageCarrier(msg *Message) messageCarrier { + return messageCarrier{msg: msg} +} + +// Get retrieves a single value for a given key. +func (c messageCarrier) Get(key string) string { + return c.msg.Attributes[googclientPrefix+key] +} + +// Set sets an attribute. +func (c messageCarrier) Set(key, val string) { + c.msg.Attributes[googclientPrefix+key] = val +} + +// Keys returns a slice of all keys in the carrier. +func (c messageCarrier) Keys() []string { + i := 0 + out := make([]string, len(c.msg.Attributes)) + for k := range c.msg.Attributes { + out[i] = k + i++ + } + return out +} + +// injectPropagation injects context data into the Pub/Sub message's Attributes field. +func injectPropagation(ctx context.Context, msg *Message) { + // only inject propagation if a valid span context was detected. + if trace.SpanFromContext(ctx).SpanContext().IsValid() { + if msg.Attributes == nil { + msg.Attributes = make(map[string]string) + } + propagation.TraceContext{}.Inject(ctx, newMessageCarrier(msg)) + } +} + +const ( + // publish span names + createSpanName = "create" + publishFCSpanName = "publisher flow control" + batcherSpanName = "publisher batching" + publishRPCSpanName = "publish" + + // subscribe span names + subscribeSpanName = "subscribe" + ccSpanName = "subscriber concurrency control" + processSpanName = "process" + scheduleSpanName = "subscribe scheduler" + modackSpanName = "modack" + ackSpanName = "ack" + nackSpanName = "nack" + + // event names + eventPublishStart = "publish start" + eventPublishEnd = "publish end" + eventModackStart = "modack start" + eventModackEnd = "modack end" + eventAckStart = "ack start" + eventAckEnd = "ack end" + eventNackStart = "nack start" + eventNackEnd = "nack end" + eventAckCalled = "ack called" + eventNackCalled = "nack called" + + resultAcked = "acked" + resultNacked = "nacked" + resultExpired = "expired" + + // custom pubsub specific attributes + gcpProjectIDAttribute = "gcp.project_id" + pubsubPrefix = "messaging.gcp_pubsub." + orderingAttribute = pubsubPrefix + "message.ordering_key" + deliveryAttemptAttribute = pubsubPrefix + "message.delivery_attempt" + eosAttribute = pubsubPrefix + "exactly_once_delivery" + ackIDAttribute = pubsubPrefix + "message.ack_id" + resultAttribute = pubsubPrefix + "result" + receiptModackAttribute = pubsubPrefix + "is_receipt_modack" +) + +func startSpan(ctx context.Context, spanType, resourceID string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + spanName := spanType + if resourceID != "" { + spanName = fmt.Sprintf("%s %s", resourceID, spanType) + } + return tracer().Start(ctx, spanName, opts...) +} + +func getPublishSpanAttributes(project, dst string, msg *Message, attrs ...attribute.KeyValue) []trace.SpanStartOption { + opts := []trace.SpanStartOption{ + trace.WithAttributes( + semconv.MessagingMessageID(msg.ID), + semconv.MessagingMessageBodySize(len(msg.Data)), + attribute.String(orderingAttribute, msg.OrderingKey), + ), + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindProducer), + } + opts = append(opts, getCommonOptions(project, dst)...) + return opts +} + +func getSubscriberOpts(project, dst string, msg *Message, attrs ...attribute.KeyValue) []trace.SpanStartOption { + opts := []trace.SpanStartOption{ + trace.WithAttributes( + semconv.MessagingMessageID(msg.ID), + semconv.MessagingMessageBodySize(len(msg.Data)), + attribute.String(orderingAttribute, msg.OrderingKey), + ), + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindConsumer), + } + if msg.DeliveryAttempt != nil { + opts = append(opts, trace.WithAttributes(attribute.Int(deliveryAttemptAttribute, *msg.DeliveryAttempt))) + } + opts = append(opts, getCommonOptions(project, dst)...) + return opts +} + +func getCommonOptions(projectID, destination string) []trace.SpanStartOption { + opts := []trace.SpanStartOption{ + trace.WithAttributes( + attribute.String(gcpProjectIDAttribute, projectID), + semconv.MessagingSystemGCPPubsub, + semconv.MessagingDestinationName(destination), + ), + } + return opts + +} + +// spanRecordError records the error, sets the status to error, and ends the span. +// This is recommended by https://opentelemetry.io/docs/instrumentation/go/manual/#record-errors +// since RecordError doesn't set the status of a span. +func spanRecordError(span trace.Span, err error) { + if span != nil { + span.RecordError(err) + span.SetStatus(otelcodes.Error, err.Error()) + span.End() + } +} diff --git a/pubsub/trace_test.go b/pubsub/trace_test.go new file mode 100644 index 000000000000..ba98aea05441 --- /dev/null +++ b/pubsub/trace_test.go @@ -0,0 +1,730 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "context" + "fmt" + "slices" + "sync/atomic" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsub/internal" + "cloud.google.com/go/pubsub/pstest" + "github.com/google/go-cmp/cmp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/instrumentation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestTrace_MessageCarrier(t *testing.T) { + ctx := context.Background() + e := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e)) + otel.SetTextMapPropagator(propagation.TraceContext{}) + defer tp.Shutdown(ctx) + otel.SetTracerProvider(tp) + + ctx, _ = tp.Tracer("a").Start(ctx, "fake-span") + msg := &Message{ + Data: []byte("asdf"), + OrderingKey: "asdf", + Attributes: map[string]string{}, + } + otel.GetTextMapPropagator().Inject(ctx, newMessageCarrier(msg)) + + if _, ok := msg.Attributes[googclientPrefix+"traceparent"]; !ok { + t.Fatalf("expected traceparent in message attributes, found empty string") + } + + newCtx := context.Background() + propagation.TraceContext{}.Extract(newCtx, newMessageCarrier(msg)) + if _, ok := msg.Attributes[googclientPrefix+"traceparent"]; !ok { + t.Fatalf("expected traceparent in message attributes, found empty string") + } +} + +func TestTrace_PublishSpan(t *testing.T) { + ctx := context.Background() + c, srv := newFakeWithTracing(t) + defer c.Close() + defer srv.Close() + + e := tracetest.NewInMemoryExporter() + g := &incrementIDGenerator{} + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e), sdktrace.WithIDGenerator(g)) + defer tp.Shutdown(ctx) + otel.SetTracerProvider(tp) + + m := &Message{ + Data: []byte("test"), + OrderingKey: "my-key", + } + + topicID := "t" + + expectedSpans := tracetest.SpanStubs{ + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", topicID, createSpanName), + SpanKind: trace.SpanKindProducer, + Attributes: []attribute.KeyValue{ + semconv.CodeFunction("Publish"), + semconv.MessagingDestinationName(topicID), + attribute.String(orderingAttribute, m.OrderingKey), + // Hardcoded since the fake server always returns m0 first. + semconv.MessagingMessageIDKey.String("m0"), + semconv.MessagingSystemGCPPubsub, + semconv.MessagingMessageBodySize(len(m.Data)), + attribute.String(gcpProjectIDAttribute, projName), + }, + Events: []sdktrace.Event{ + { + Name: eventPublishStart, + Attributes: []attribute.KeyValue{ + semconv.MessagingBatchMessageCount(1), + }, + }, + { + Name: eventPublishEnd, + }, + }, + Links: []sdktrace.Link{ + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: [16]byte{(byte(2))}, + SpanID: [8]byte{(byte(4))}, + TraceFlags: 1, + }), + }, + }, + ChildSpanCount: 2, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: publishFCSpanName, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: batcherSpanName, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", topicID, publishRPCSpanName), + Attributes: []attribute.KeyValue{ + semconv.MessagingSystemGCPPubsub, + semconv.MessagingDestinationName(topicID), + semconv.CodeFunction("publishMessageBundle"), + semconv.MessagingBatchMessageCount(1), + attribute.String(gcpProjectIDAttribute, projName), + }, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + Links: []sdktrace.Link{ + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: [16]byte{(byte(1))}, + SpanID: [8]byte{(byte(1))}, + TraceFlags: 1, + }), + }, + }, + }, + } + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + if m.OrderingKey != "" { + topic.EnableMessageOrdering = true + } + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err != nil { + t.Fatalf("failed to publish message: %v", err) + } + defer topic.Stop() + + got := getSpans(e) + slices.SortFunc(expectedSpans, func(a, b tracetest.SpanStub) int { + return sortSpanStub(a, b) + }) + compareSpans(t, got, expectedSpans) +} + +func TestTrace_PublishSpanError(t *testing.T) { + ctx := context.Background() + c, srv := newFakeWithTracing(t) + defer c.Close() + defer srv.Close() + + e := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e)) + defer tp.Shutdown(ctx) + otel.SetTracerProvider(tp) + + m := &Message{ + Data: []byte("test"), + OrderingKey: "m", + } + + topicID := "t" + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + + // Publishing a message with an ordering key without enabling ordering topic ordering + // should fail. + t.Run("no ordering key", func(t *testing.T) { + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err == nil { + t.Fatal("expected err, got nil") + } + + want := getPublishSpanStubsWithError(topicID, m, errTopicOrderingNotEnabled) + + got := getSpans(e) + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + } + if diff := testutil.Diff(got, want, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + e.Reset() + topic.ResumePublish(m.OrderingKey) + }) + + t.Run("stopped topic", func(t *testing.T) { + // Publishing a message with a stopped publisher should fail too + topic.ResumePublish(m.OrderingKey) + topic.EnableMessageOrdering = true + topic.Stop() + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err == nil { + t.Fatal("expected err, got nil") + } + + got := getSpans(e) + want := getPublishSpanStubsWithError(topicID, m, ErrTopicStopped) + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + } + if diff := testutil.Diff(got, want, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + e.Reset() + topic.ResumePublish(m.OrderingKey) + }) + + t.Run("flow control error", func(t *testing.T) { + // Use a different topic here than above since + // we need to adjust the flow control settings, + // which are immutable after publish. + topicID := "t2" + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + topic.EnableMessageOrdering = true + topic.PublishSettings.FlowControlSettings = FlowControlSettings{ + LimitExceededBehavior: FlowControlSignalError, + MaxOutstandingBytes: 1, + } + + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err == nil { + t.Fatal("expected err, got nil") + } + + got := getSpans(e) + want := getFlowControlSpanStubs(ErrFlowControllerMaxOutstandingBytes) + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + } + if diff := testutil.Diff(got, want, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + }) +} + +func TestTrace_SubscribeSpans(t *testing.T) { + ctx := context.Background() + c, srv := newFakeWithTracing(t) + defer c.Close() + defer srv.Close() + + // For subscribe spans, we'll publish before setting the tracer provider + // so we don't trace the publish spans. Context propagation is tested separately. + m := &Message{ + Data: []byte("test"), + OrderingKey: "my-key", + } + + topicID := "t" + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + + subID := "s" + enableEOS := false + + sub, err := c.CreateSubscription(ctx, subID, SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: enableEOS, + }) + if err != nil { + t.Fatalf("failed to create subscription: %v", err) + } + if m.OrderingKey != "" { + topic.EnableMessageOrdering = true + } + + // Call publish before enabling tracer provider to only test subscribe spans. + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + e := tracetest.NewInMemoryExporter() + g := &incrementIDGenerator{} + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e), sdktrace.WithIDGenerator(g)) + defer tp.Shutdown(ctx) + otel.SetTracerProvider(tp) + + ctx, cancel := context.WithCancel(ctx) + + sub.Receive(ctx, func(ctx context.Context, m *Message) { + // Add artificial processsing time so the message isn't acked before the modack can be sent out. + time.Sleep(1 * time.Second) + m.Ack() + cancel() + }) + + expectedSpans := tracetest.SpanStubs{ + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", subID, processSpanName), + SpanKind: trace.SpanKindInternal, + Attributes: []attribute.KeyValue{ + semconv.MessagingOperationTypeDeliver, + }, + Events: []sdktrace.Event{ + { + Name: eventAckCalled, + }, + }, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", subID, subscribeSpanName), + SpanKind: trace.SpanKindConsumer, + Attributes: []attribute.KeyValue{ + semconv.CodeFunction("receive"), + semconv.MessagingBatchMessageCount(1), + semconv.MessagingDestinationName(subID), + attribute.Bool(eosAttribute, enableEOS), + // Hardcoded since the fake server always returns m0 first. + semconv.MessagingMessageIDKey.String("m0"), + // The fake server uses message ID as ackID, this is not the case with live service. + attribute.String(ackIDAttribute, "m0"), + attribute.String(orderingAttribute, m.OrderingKey), + attribute.String(resultAttribute, resultAcked), + semconv.MessagingSystemGCPPubsub, + semconv.MessagingMessageBodySize(len(m.Data)), + attribute.String(gcpProjectIDAttribute, projName), + }, + Events: []sdktrace.Event{ + { + Name: eventModackStart, + Attributes: []attribute.KeyValue{ + semconv.MessagingBatchMessageCount(1), + }, + }, + { + Name: eventModackEnd, + }, + { + Name: eventAckStart, + Attributes: []attribute.KeyValue{ + semconv.MessagingBatchMessageCount(1), + }, + }, + { + Name: eventAckEnd, + }, + }, + // ChildSpanCount: 3, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: scheduleSpanName, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: ccSpanName, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", subID, ackSpanName), + Links: []sdktrace.Link{ + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: [16]byte{(byte(1))}, + SpanID: [8]byte{(byte(1))}, + TraceFlags: 1, + }), + }, + }, + Attributes: []attribute.KeyValue{ + semconv.CodeFunction("sendAck"), + semconv.MessagingBatchMessageCount(1), + semconv.MessagingSystemGCPPubsub, + semconv.MessagingDestinationName(subID), + attribute.String(gcpProjectIDAttribute, projName), + }, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + }, + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", subID, modackSpanName), + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + Links: []sdktrace.Link{ + { + SpanContext: trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: [16]byte{(byte(1))}, + SpanID: [8]byte{(byte(1))}, + TraceFlags: 1, + }), + }, + }, + Attributes: []attribute.KeyValue{ + semconv.CodeFunction("sendModAck"), + attribute.Bool(receiptModackAttribute, true), + semconv.MessagingGCPPubsubMessageAckDeadline(10), + semconv.MessagingBatchMessageCount(1), + semconv.MessagingSystemGCPPubsub, + semconv.MessagingDestinationName(subID), + attribute.String(gcpProjectIDAttribute, projName), + }, + }, + } + + got := getSpans(e) + + slices.SortFunc(expectedSpans, func(a, b tracetest.SpanStub) int { + return sortSpanStub(a, b) + }) + compareSpans(t, got, expectedSpans) +} + +func TestTrace_TracingNotEnabled(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + e := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e)) + defer tp.Shutdown(ctx) + otel.SetTracerProvider(tp) + + m := &Message{ + Data: []byte("test"), + } + + topicID := "t" + subID := "s" + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + sub, err := c.CreateSubscription(ctx, subID, SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + t.Fatalf("failed to create subscription: %v", err) + } + + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + sub.Receive(ctx, func(ctx context.Context, msg *Message) { + msg.Ack() + cancel() + }) + + got := getSpans(e) + if len(got) != 0 { + t.Fatalf("expected no spans, got %d", len(got)) + } +} + +func spanStubComparer(a, b tracetest.SpanStub) bool { + if a.Name != b.Name { + fmt.Printf("a.Name: %s\nb.Name: %s\n", a.Name, b.Name) + return false + } + if a.ChildSpanCount != b.ChildSpanCount { + fmt.Printf("a.ChildSpanCount: %d\nb.ChildSpanCount: %d\n", a.ChildSpanCount, b.ChildSpanCount) + return false + } + // Use attribute.NewSet and set.Equals() once go1.21 is min version + if len(a.Attributes) != len(b.Attributes) { + fmt.Printf("len mismatch: a.Attributes: %d, b.Attributes: %d\n", len(a.Attributes), len(b.Attributes)) + return false + } + if a.InstrumentationLibrary != b.InstrumentationLibrary { + fmt.Printf("a.InstrumentationLibrary: %v\nb.InstrumentationLibrary: %v\n", a.InstrumentationLibrary, b.InstrumentationLibrary) + return false + } + if a.Status != b.Status { + fmt.Printf("a.Status: %v\nb.Status: %v\n", a.Status, b.Status) + return false + } + return true + +} + +func sortSpanStub(a, b tracetest.SpanStub) int { + if a.Name == b.Name { + return 0 + } else if a.Name < b.Name { + return -1 + } else { + return 1 + } +} + +func getSpans(e *tracetest.InMemoryExporter) tracetest.SpanStubs { + // Wait a fixed amount for spans to be fully exported. + time.Sleep(100 * time.Millisecond) + + spans := e.GetSpans() + + // Implement sortable struct, replace with slices.SortFunc once go 1.21 is min version + slices.SortFunc(spans, func(a, b tracetest.SpanStub) int { + return sortSpanStub(a, b) + }) + return spans +} + +func compareSpans(t *testing.T, got, want tracetest.SpanStubs) { + if len(got) != len(want) { + for _, span := range got { + t.Logf("got: %s\n", span.Name) + } + for _, span := range want { + t.Logf("want: %s\n", span.Name) + } + t.Errorf("got %d spans, want %d", len(got), len(want)) + } + opts := []cmp.Option{ + cmp.Comparer(spanStubComparer), + } + for i, span := range got { + wanti := want[i] + if diff := testutil.Diff(span, wanti, opts...); diff != "" { + t.Errorf("diff: -got, +want:\n%s\n", diff) + } + } +} + +func getPublishSpanStubsWithError(topicID string, m *Message, err error) tracetest.SpanStubs { + return tracetest.SpanStubs{ + tracetest.SpanStub{ + Name: fmt.Sprintf("%s %s", topicID, createSpanName), + SpanKind: trace.SpanKindProducer, + Attributes: []attribute.KeyValue{ + semconv.CodeFunction("Publish"), + semconv.MessagingDestinationName(topicID), + semconv.MessagingMessageIDKey.String(""), + semconv.MessagingMessageBodySize(len(m.Data)), + attribute.String(orderingAttribute, m.OrderingKey), + semconv.MessagingSystemGCPPubsub, + attribute.String(gcpProjectIDAttribute, projName), + }, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + Status: sdktrace.Status{ + Code: codes.Error, + Description: err.Error(), + }, + }, + } +} + +func getFlowControlSpanStubs(err error) tracetest.SpanStubs { + return tracetest.SpanStubs{ + tracetest.SpanStub{ + Name: publishFCSpanName, + InstrumentationLibrary: instrumentation.Scope{ + Name: "cloud.google.com/go/pubsub", + Version: internal.Version, + }, + Status: sdktrace.Status{ + Code: codes.Error, + Description: err.Error(), + }, + }, + } +} + +func newFakeWithTracing(t *testing.T) (*Client, *pstest.Server) { + ctx := context.Background() + srv := pstest.NewServer() + client, err := NewClientWithConfig(ctx, projName, + &ClientConfig{EnableOpenTelemetryTracing: true}, + option.WithEndpoint(srv.Addr), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithTelemetryDisabled(), + ) + if err != nil { + t.Fatal(err) + } + return client, srv +} + +var _ sdktrace.IDGenerator = &incrementIDGenerator{} + +type incrementIDGenerator struct { + tid int64 + sid int64 +} + +func (i *incrementIDGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID { + id := atomic.AddInt64(&i.sid, 1) + sid := [8]byte{byte(id)} + return sid +} + +// NewIDs returns a non-zero trace ID and a non-zero span ID from a +// randomly-chosen sequence. +func (i *incrementIDGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) { + id1 := atomic.AddInt64(&i.tid, 1) + id2 := atomic.AddInt64(&i.sid, 1) + + tid := [16]byte{byte(id1)} + sid := [8]byte{byte(id2)} + return tid, sid +} + +func BenchmarkNoTracingEnabled(b *testing.B) { + ctx := context.Background() + t := &testing.T{} + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + e := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(e)) + defer tp.Shutdown(ctx) + otel.SetTracerProvider(tp) + + m := &Message{ + Data: []byte("test"), + } + + topicID := "t" + subID := "s" + + topic, err := c.CreateTopic(ctx, topicID) + if err != nil { + b.Fatalf("failed to create topic: %v", err) + } + sub, err := c.CreateSubscription(ctx, subID, SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + b.Fatalf("failed to create subscription: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := topic.Publish(ctx, m) + _, err = r.Get(ctx) + if err != nil { + b.Fatalf("failed to publish message: %v", err) + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + sub.Receive(ctx, func(ctx context.Context, msg *Message) { + msg.Ack() + cancel() + }) + + got := getSpans(e) + if len(got) != 0 { + b.Fatalf("expected no spans, got %d", len(got)) + } + + } +}