Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos DB: Add support for Span #23268

Merged
merged 30 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.0.4 (Unreleased)

### Features Added
* Added support for OpenTelemetry trace spans. See [PR 23268](https://github.com/Azure/azure-sdk-for-go/pull/23268)

### Breaking Changes

Expand Down
76 changes: 58 additions & 18 deletions sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ const (

// Client is used to interact with the Azure Cosmos DB database service.
type Client struct {
endpoint string
pipeline azruntime.Pipeline
gem *globalEndpointManager
endpoint string
internal *azcore.Client
gem *globalEndpointManager
endpointUrl *url.URL
}

// Endpoint used to create the client.
Expand All @@ -42,24 +43,38 @@ func (c *Client) Endpoint() string {
// cred - The credential used to authenticate with the cosmos service.
// options - Optional Cosmos client options. Pass nil to accept default values.
func NewClientWithKey(endpoint string, cred KeyCredential, o *ClientOptions) (*Client, error) {
endpointUrl, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
preferredRegions := []string{}
enableCrossRegionRetries := true
if o != nil {
preferredRegions = o.PreferredRegions
}

gem, err := newGlobalEndpointManager(endpoint, newInternalPipeline(newSharedKeyCredPolicy(cred), o), preferredRegions, 0, enableCrossRegionRetries)
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, pipeline: newPipeline(newSharedKeyCredPolicy(cred), gem, o), gem: gem}, nil

internalClient, err := newClient(newSharedKeyCredPolicy(cred), gem, o)
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, endpointUrl: endpointUrl, internal: internalClient, gem: gem}, nil
}

// NewClient creates a new instance of Cosmos client with Azure AD access token authentication. It uses the default pipeline configuration.
// endpoint - The cosmos service endpoint to use.
// cred - The credential used to authenticate with the cosmos service.
// options - Optional Cosmos client options. Pass nil to accept default values.
func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (*Client, error) {
scope, err := createScopeFromEndpoint(endpoint)
endpointUrl, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
scope, err := createScopeFromEndpoint(endpointUrl)
if err != nil {
return nil, err
}
Expand All @@ -72,7 +87,12 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o), gem: gem}, nil

internalClient, err := newClient(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o)
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, endpointUrl: endpointUrl, internal: internalClient, gem: gem}, nil
}

// NewClientFromConnectionString creates a new instance of Cosmos client from connection string. It uses the default pipeline configuration.
Expand Down Expand Up @@ -111,11 +131,11 @@ func NewClientFromConnectionString(connectionString string, o *ClientOptions) (*
return NewClientWithKey(endpoint, cred, o)
}

func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) azruntime.Pipeline {
func newClient(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) (*azcore.Client, error) {
if options == nil {
options = &ClientOptions{}
}
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
return azcore.NewClient(moduleName, serviceLibVersion,
azruntime.PipelineOptions{
AllowedHeaders: getAllowedHeaders(),
PerCall: []policy.Policy{
Expand All @@ -128,6 +148,9 @@ func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *
authPolicy,
&clientRetryPolicy{gem: gem},
},
Tracing: azruntime.TracingOptions{
Namespace: "Microsoft.DocumentDB",
},
},
&options.ClientOptions)
}
Expand All @@ -136,7 +159,7 @@ func newInternalPipeline(authPolicy policy.Policy, options *ClientOptions) azrun
if options == nil {
options = &ClientOptions{}
}
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
return azruntime.NewPipeline(moduleName, serviceLibVersion,
azruntime.PipelineOptions{
AllowedHeaders: getAllowedHeaders(),
PerRetry: []policy.Policy{
Expand All @@ -146,13 +169,8 @@ func newInternalPipeline(authPolicy policy.Policy, options *ClientOptions) azrun
&options.ClientOptions)
}

func createScopeFromEndpoint(endpoint string) ([]string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}

return []string{fmt.Sprintf("%s://%s/.default", u.Scheme, u.Hostname())}, nil
func createScopeFromEndpoint(endpoint *url.URL) ([]string, error) {
return []string{fmt.Sprintf("%s://%s/.default", endpoint.Scheme, endpoint.Hostname())}, nil
}

// NewDatabase returns a struct that represents a database and allows database level operations.
Expand Down Expand Up @@ -193,6 +211,14 @@ func (c *Client) CreateDatabase(
ctx context.Context,
databaseProperties DatabaseProperties,
o *CreateDatabaseOptions) (DatabaseResponse, error) {
var err error
spanName, err := getSpanNameForDatabases(c.accountEndpointUrl(), operationTypeCreate, resourceTypeDatabase, databaseProperties.ID)
if err != nil {
return DatabaseResponse{}, err
}
ctx, endSpan := azruntime.StartSpan(ctx, spanName.name, c.internal.Tracer(), &spanName.options)
defer func() { endSpan(err) }()

if o == nil {
o = &CreateDatabaseOptions{}
}
Expand Down Expand Up @@ -224,7 +250,8 @@ func (c *Client) CreateDatabase(
return DatabaseResponse{}, err
}

return newDatabaseResponse(azResponse)
response, err := newDatabaseResponse(azResponse)
return response, err
}

// NewQueryDatabasesPager executes query for databases.
Expand All @@ -249,6 +276,14 @@ func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions)
return page.ContinuationToken != nil
},
Fetcher: func(ctx context.Context, page *QueryDatabasesResponse) (QueryDatabasesResponse, error) {
var err error
// Move the span to the pager once https://github.com/Azure/azure-sdk-for-go/issues/23294 is fixed
spanName, err := getSpanNameForClient(c.accountEndpointUrl(), operationTypeQuery, resourceTypeDatabase, c.accountEndpointUrl().Hostname())
if err != nil {
return QueryDatabasesResponse{}, err
}
ctx, endSpan := azruntime.StartSpan(ctx, spanName.name, c.internal.Tracer(), &spanName.options)
defer func() { endSpan(err) }()
ealsur marked this conversation as resolved.
Show resolved Hide resolved
if page != nil {
if page.ContinuationToken != nil {
// Use the previous page continuation if available
Expand All @@ -271,6 +306,7 @@ func (c *Client) NewQueryDatabasesPager(query string, o *QueryDatabasesOptions)

return newDatabasesQueryResponse(azResponse)
},
Tracer: c.internal.Tracer(),
})
}

Expand Down Expand Up @@ -473,7 +509,7 @@ func (c *Client) attachContent(content interface{}, req *policy.Request) error {

func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http.Response, error) {
log.Write(azlog.EventResponse, fmt.Sprintf("\n===== Client preferred regions:\n%v\n=====\n", c.gem.preferredLocations))
response, err := c.pipeline.Do(request)
response, err := c.internal.Pipeline().Do(request)
if err != nil {
return nil, err
}
Expand All @@ -486,6 +522,10 @@ func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http
return nil, azruntime.NewResponseErrorWithErrorCode(response, response.Status)
}

func (c *Client) accountEndpointUrl() *url.URL {
return c.endpointUrl
}

type pipelineRequestOptions struct {
headerOptionsOverride *headerOptionsOverride
resourceType resourceType
Expand Down
29 changes: 15 additions & 14 deletions sdk/data/azcosmos/cosmos_client_retry_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestSessionNotAvailableSingleMaster(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

// Setting up responses for consistent failures
srv.AppendResponse(
Expand All @@ -53,7 +54,7 @@ func TestSessionNotAvailableSingleMaster(t *testing.T) {
mock.WithHeader("x-ms-substatus", "1002"),
mock.WithStatusCode(404))

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestSessionNotAvailableMultiMaster(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

// Setting up responses for using all retries and failing
srv.AppendResponse(
Expand All @@ -146,7 +147,7 @@ func TestSessionNotAvailableMultiMaster(t *testing.T) {
mock.WithHeader("x-ms-substatus", "1002"),
mock.WithStatusCode(404))

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
Expand Down Expand Up @@ -238,7 +239,7 @@ func TestReadEndpointFailure(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

// Setting up responses for retrying twice
srv.AppendResponse(
Expand All @@ -250,7 +251,7 @@ func TestReadEndpointFailure(t *testing.T) {
srv.AppendResponse(
mock.WithStatusCode(200))

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
Expand Down Expand Up @@ -291,9 +292,9 @@ func TestWriteEndpointFailure(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down Expand Up @@ -354,9 +355,9 @@ func TestReadServiceUnavailable(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down Expand Up @@ -427,9 +428,9 @@ func TestWriteServiceUnavailable(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down Expand Up @@ -506,9 +507,9 @@ func TestDnsErrorRetry(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), endpointUrl: defaultEndpoint, internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down
Loading