Skip to content

Commit

Permalink
Add supporting features to enable distributed tracing (#20301) (#20708)
Browse files Browse the repository at this point in the history
* Add supporting features to enable distributed tracing

This includes new internal pipeline policies and other supporting types.
See the changelog for a full description.
Added some missing doc comments.

* fix linter issue

* add net.peer.name trace attribute

sequence custom HTTP header policy before logging policy.
sequence logging policy after HTTP trace policy.
keep body download policy at the end.

* add span for iterating over pages
  • Loading branch information
jhendrixMSFT authored May 9, 2023
1 parent 132a01a commit 8db51ca
Show file tree
Hide file tree
Showing 19 changed files with 677 additions and 32 deletions.
8 changes: 8 additions & 0 deletions sdk/azcore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
## 1.6.1 (Unreleased)

### Features Added
* Added supporting features to enable distributed tracing.
* Added func `runtime.StartSpan()` for use by SDKs to start spans.
* Added method `WithContext()` to `runtime.Request` to support shallow cloning with a new context.
* Added field `TracingNamespace` to `runtime.PipelineOptions`.
* Added field `Tracer` to `runtime.NewPollerOptions` and `runtime.NewPollerFromResumeTokenOptions` types.
* Added field `SpanFromContext` to `tracing.TracerOptions`.
* Added methods `Enabled()`, `SetAttributes()`, and `SpanFromContext()` to `tracing.Tracer`.
* Added supporting pipeline policies to include HTTP spans when creating clients.

### Breaking Changes

Expand Down
3 changes: 2 additions & 1 deletion sdk/azcore/arm/runtime/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
armpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
azpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
)
Expand All @@ -34,7 +35,7 @@ func NewPipeline(module, version string, cred azcore.TokenCredential, plOpts azr
})
perRetry := make([]azpolicy.Policy, len(plOpts.PerRetry), len(plOpts.PerRetry)+1)
copy(perRetry, plOpts.PerRetry)
plOpts.PerRetry = append(perRetry, authPolicy)
plOpts.PerRetry = append(perRetry, authPolicy, exported.PolicyFunc(httpTraceNamespacePolicy))
if !options.DisableRPRegistration {
regRPOpts := armpolicy.RegistrationOptions{ClientOptions: options.ClientOptions}
regPolicy, err := NewRPRegistrationPolicy(cred, &regRPOpts)
Expand Down
31 changes: 31 additions & 0 deletions sdk/azcore/arm/runtime/policy_trace_namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package runtime

import (
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/internal/resource"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
)

// httpTraceNamespacePolicy is a policy that adds the az.namespace attribute to the current Span
func httpTraceNamespacePolicy(req *policy.Request) (resp *http.Response, err error) {
rawTracer := req.Raw().Context().Value(shared.CtxWithTracingTracer{})
if tracer, ok := rawTracer.(tracing.Tracer); ok {
rt, err := resource.ParseResourceType(req.Raw().URL.Path)
if err == nil {
// add the namespace attribute to the current span
if span, ok := tracer.SpanFromContext(req.Raw().Context()); ok {
span.SetAttributes(tracing.Attribute{Key: "az.namespace", Value: rt.Namespace})
}
}
}
return req.Next()
}
97 changes: 97 additions & 0 deletions sdk/azcore/arm/runtime/policy_trace_namespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package runtime

import (
"context"
"net/http"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
)

func TestHTTPTraceNamespacePolicy(t *testing.T) {
srv, close := mock.NewServer()
defer close()

pl := exported.NewPipeline(srv, exported.PolicyFunc(httpTraceNamespacePolicy))

// no tracer
req, err := exported.NewRequest(context.Background(), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// wrong tracer type
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, 0), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// no SpanFromContext impl
tr := tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, nil)
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// failed to parse resource ID, shouldn't call SetAttributes
var attrString string
tr = tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, &tracing.TracerOptions{
SpanFromContext: func(ctx context.Context) (tracing.Span, bool) {
spanImpl := tracing.SpanImpl{
SetAttributes: func(a ...tracing.Attribute) {
require.Len(t, a, 1)
v, ok := a[0].Value.(string)
require.True(t, ok)
attrString = a[0].Key + ":" + v
},
}
return tracing.NewSpan(spanImpl), true
},
})
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)
require.Empty(t, attrString)

// success
tr = tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, &tracing.TracerOptions{
SpanFromContext: func(ctx context.Context) (tracing.Span, bool) {
spanImpl := tracing.SpanImpl{
SetAttributes: func(a ...tracing.Attribute) {
require.Len(t, a, 1)
v, ok := a[0].Value.(string)
require.True(t, ok)
attrString = a[0].Key + ":" + v
},
}
return tracing.NewSpan(spanImpl), true
},
})
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL()+requestEndpoint)
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)
require.EqualValues(t, "az.namespace:Microsoft.Storage", attrString)
}
3 changes: 3 additions & 0 deletions sdk/azcore/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func NewClient(clientName, moduleVersion string, plOpts runtime.PipelineOptions,
pl := runtime.NewPipeline(pkg, moduleVersion, plOpts, options)

tr := options.TracingProvider.NewTracer(clientName, moduleVersion)
if tr.Enabled() && plOpts.TracingNamespace != "" {
tr.SetAttributes(tracing.Attribute{Key: "az.namespace", Value: plOpts.TracingNamespace})
}
return &Client{pl: pl, tr: tr}, nil
}

Expand Down
41 changes: 41 additions & 0 deletions sdk/azcore/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
package azcore

import (
"context"
"net/http"
"reflect"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -131,3 +137,38 @@ func TestNewClientError(t *testing.T) {
require.Error(t, err)
require.Nil(t, client)
}

func TestNewClientTracingEnabled(t *testing.T) {
srv, close := mock.NewServer()
defer close()

var attrString string
client, err := NewClient("package.Client", "v1.0.0", runtime.PipelineOptions{TracingNamespace: "Widget.Factory"}, &policy.ClientOptions{
TracingProvider: tracing.NewProvider(func(name, version string) tracing.Tracer {
return tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
require.NotNil(t, options)
for _, attr := range options.Attributes {
if attr.Key == "az.namespace" {
v, ok := attr.Value.(string)
require.True(t, ok)
attrString = attr.Key + ":" + v
}
}
return ctx, tracing.Span{}
}, nil)
}, nil),
Transport: srv,
})
require.NoError(t, err)
require.NotNil(t, client)
require.NotZero(t, client.Pipeline())
require.NotZero(t, client.Tracer())

const requestEndpoint = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/fakeResourceGroupo/providers/Microsoft.Storage/storageAccounts/fakeAccountName"
req, err := exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, client.Tracer()), http.MethodGet, srv.URL()+requestEndpoint)
require.NoError(t, err)
srv.AppendResponse()
_, err = client.Pipeline().Do(req)
require.NoError(t, err)
require.EqualValues(t, "az.namespace:Widget.Factory", attrString)
}
8 changes: 8 additions & 0 deletions sdk/azcore/internal/exported/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (req *Request) Clone(ctx context.Context) *Request {
return &r2
}

// WithContext returns a shallow copy of the request with its context changed to ctx.
func (req *Request) WithContext(ctx context.Context) *Request {
r2 := new(Request)
*r2 = *req
r2.req = r2.req.WithContext(ctx)
return r2
}

// not exported but dependent on Request

// PolicyFunc is a type that implements the Policy interface.
Expand Down
17 changes: 17 additions & 0 deletions sdk/azcore/internal/exported/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,20 @@ func TestNewRequestFail(t *testing.T) {
t.Fatal("unexpected request")
}
}

func TestRequestWithContext(t *testing.T) {
type ctxKey1 struct{}
type ctxKey2 struct{}

req1, err := NewRequest(context.WithValue(context.Background(), ctxKey1{}, 1), http.MethodPost, testURL)
require.NoError(t, err)
require.NotNil(t, req1.Raw().Context().Value(ctxKey1{}))

req2 := req1.WithContext(context.WithValue(context.Background(), ctxKey2{}, 1))
require.Nil(t, req2.Raw().Context().Value(ctxKey1{}))
require.NotNil(t, req2.Raw().Context().Value(ctxKey2{}))

// shallow copy, so changing req2 affects req1
req2.Raw().Header.Add("added-req2", "value")
require.EqualValues(t, "value", req1.Raw().Header.Get("added-req2"))
}
1 change: 1 addition & 0 deletions sdk/azcore/internal/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
HeaderUserAgent = "User-Agent"
HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderXMSClientRequestID = "x-ms-client-request-id"
HeaderXMSRequestID = "x-ms-request-id"
)

const BearerTokenPrefix = "Bearer "
Expand Down
3 changes: 3 additions & 0 deletions sdk/azcore/internal/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type CtxWithRetryOptionsKey struct{}
// CtxIncludeResponseKey is used as a context key for retrieving the raw response.
type CtxIncludeResponseKey struct{}

// CtxWithTracingTracer is used as a context key for adding/retrieving tracing.Tracer.
type CtxWithTracingTracer struct{}

// Delay waits for the duration to elapse or the context to be cancelled.
func Delay(ctx context.Context, delay time.Duration) error {
select {
Expand Down
3 changes: 2 additions & 1 deletion sdk/azcore/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Request = exported.Request
// ClientOptions contains optional settings for a client's pipeline.
// All zero-value fields will be initialized with default values.
type ClientOptions struct {
// APIVersion overrides the default version requested of the service. Set with caution as this package version has not been tested with arbitrary service versions.
// APIVersion overrides the default version requested of the service.
// Set with caution as this package version has not been tested with arbitrary service versions.
APIVersion string

// Cloud specifies a cloud for the client. The default is Azure Public Cloud.
Expand Down
13 changes: 13 additions & 0 deletions sdk/azcore/runtime/pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
)

// PagingHandler contains the required data for constructing a Pager.
Expand All @@ -20,12 +24,16 @@ type PagingHandler[T any] struct {

// Fetcher fetches the first and subsequent pages.
Fetcher func(context.Context, *T) (T, error)

// Tracer contains the Tracer from the client that's creating the Pager.
Tracer tracing.Tracer
}

// Pager provides operations for iterating over paged responses.
type Pager[T any] struct {
current *T
handler PagingHandler[T]
tracer tracing.Tracer
firstPage bool
}

Expand All @@ -34,6 +42,7 @@ type Pager[T any] struct {
func NewPager[T any](handler PagingHandler[T]) *Pager[T] {
return &Pager[T]{
handler: handler,
tracer: handler.Tracer,
firstPage: true,
}
}
Expand All @@ -58,10 +67,14 @@ func (p *Pager[T]) NextPage(ctx context.Context) (T, error) {
} else if !p.handler.More(*p.current) {
return *new(T), errors.New("no more pages")
}
ctx, endSpan := StartSpan(ctx, fmt.Sprintf("%s.NextPage", shortenTypeName(reflect.TypeOf(*p).Name())), p.tracer, nil)
defer endSpan(err)
resp, err = p.handler.Fetcher(ctx, p.current)
} else {
// non-LRO case, first page
p.firstPage = false
ctx, endSpan := StartSpan(ctx, fmt.Sprintf("%s.NextPage", shortenTypeName(reflect.TypeOf(*p).Name())), p.tracer, nil)
defer endSpan(err)
resp, err = p.handler.Fetcher(ctx, nil)
}
if err != nil {
Expand Down
30 changes: 26 additions & 4 deletions sdk/azcore/runtime/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,29 @@ import (

// PipelineOptions contains Pipeline options for SDK developers
type PipelineOptions struct {
AllowedHeaders, AllowedQueryParameters []string
APIVersion APIVersionOptions
PerCall, PerRetry []policy.Policy
// AllowedHeaders is the slice of headers to log with their values intact.
// All headers not in the slice will have their values REDACTED.
// Applies to request and response headers.
AllowedHeaders []string

// AllowedQueryParameters is the slice of query parameters to log with their values intact.
// All query parameters not in the slice will have their values REDACTED.
AllowedQueryParameters []string

// APIVersion overrides the default version requested of the service.
// Set with caution as this package version has not been tested with arbitrary service versions.
APIVersion APIVersionOptions

// PerCall contains custom policies to inject into the pipeline.
// Each policy is executed once per request.
PerCall []policy.Policy

// PerRetry contains custom policies to inject into the pipeline.
// Each policy is executed once per request, and for each retry of that request.
PerRetry []policy.Policy

// TracingNamespace contains the value to use for the az.namespace span attribute.
TracingNamespace string
}

// Pipeline represents a primitive for sending HTTP requests and receiving responses.
Expand Down Expand Up @@ -56,8 +76,10 @@ func NewPipeline(module, version string, plOpts PipelineOptions, options *policy
policies = append(policies, NewRetryPolicy(&cp.Retry))
policies = append(policies, plOpts.PerRetry...)
policies = append(policies, cp.PerRetryPolicies...)
policies = append(policies, exported.PolicyFunc(httpHeaderPolicy))
policies = append(policies, newHTTPTracePolicy(cp.Logging.AllowedQueryParams))
policies = append(policies, NewLogPolicy(&cp.Logging))
policies = append(policies, exported.PolicyFunc(httpHeaderPolicy), exported.PolicyFunc(bodyDownloadPolicy))
policies = append(policies, exported.PolicyFunc(bodyDownloadPolicy))
transport := cp.Transport
if transport == nil {
transport = defaultHTTPClient
Expand Down
Loading

0 comments on commit 8db51ca

Please sign in to comment.