Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos DB: Add support for Span #23268

Merged
merged 30 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.0.4 (Unreleased)

### Features Added
* Added support for tracing. See [PR 23268](https://github.com/Azure/azure-sdk-for-go/pull/23268)
ealsur marked this conversation as resolved.
Show resolved Hide resolved

### Breaking Changes

Expand Down
31 changes: 23 additions & 8 deletions sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
// Client is used to interact with the Azure Cosmos DB database service.
type Client struct {
endpoint string
pipeline azruntime.Pipeline
internal *azcore.Client
gem *globalEndpointManager
}

Expand All @@ -51,7 +51,12 @@ func NewClientWithKey(endpoint string, cred KeyCredential, o *ClientOptions) (*C
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, pipeline: newPipeline(newSharedKeyCredPolicy(cred), gem, o), gem: gem}, nil

internalClient, err := newClient(newSharedKeyCredPolicy(cred), gem, o)
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, internal: internalClient, gem: gem}, nil
}

// NewClient creates a new instance of Cosmos client with Azure AD access token authentication. It uses the default pipeline configuration.
Expand All @@ -72,7 +77,12 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o), gem: gem}, nil

internalClient, err := newClient(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o)
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, internal: internalClient, gem: gem}, nil
}

// NewClientFromConnectionString creates a new instance of Cosmos client from connection string. It uses the default pipeline configuration.
Expand Down Expand Up @@ -111,11 +121,11 @@ func NewClientFromConnectionString(connectionString string, o *ClientOptions) (*
return NewClientWithKey(endpoint, cred, o)
}

func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) azruntime.Pipeline {
func newClient(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) (*azcore.Client, error) {
if options == nil {
options = &ClientOptions{}
}
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
return azcore.NewClient(moduleName, serviceLibVersion,
azruntime.PipelineOptions{
AllowedHeaders: getAllowedHeaders(),
PerCall: []policy.Policy{
Expand All @@ -136,7 +146,7 @@ func newInternalPipeline(authPolicy policy.Policy, options *ClientOptions) azrun
if options == nil {
options = &ClientOptions{}
}
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
return azruntime.NewPipeline(moduleName, serviceLibVersion,
azruntime.PipelineOptions{
AllowedHeaders: getAllowedHeaders(),
PerRetry: []policy.Policy{
Expand Down Expand Up @@ -193,6 +203,10 @@ func (c *Client) CreateDatabase(
ctx context.Context,
databaseProperties DatabaseProperties,
o *CreateDatabaseOptions) (DatabaseResponse, error) {
var err error
ctx, endSpan := azruntime.StartSpan(ctx, "Client.CreateDatabase", c.internal.Tracer(), nil)
defer func() { endSpan(err) }()

if o == nil {
o = &CreateDatabaseOptions{}
}
Expand Down Expand Up @@ -224,7 +238,8 @@ func (c *Client) CreateDatabase(
return DatabaseResponse{}, err
}

return newDatabaseResponse(azResponse)
response, err := newDatabaseResponse(azResponse)
return response, err
}

// NewQueryDatabasesPager executes query for databases.
Expand Down Expand Up @@ -473,7 +488,7 @@ func (c *Client) attachContent(content interface{}, req *policy.Request) error {

func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http.Response, error) {
log.Write(azlog.EventResponse, fmt.Sprintf("\n===== Client preferred regions:\n%v\n=====\n", c.gem.preferredLocations))
response, err := c.pipeline.Do(request)
response, err := c.internal.Pipeline().Do(request)
if err != nil {
return nil, err
}
Expand Down
29 changes: 15 additions & 14 deletions sdk/data/azcosmos/cosmos_client_retry_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestSessionNotAvailableSingleMaster(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

// Setting up responses for consistent failures
srv.AppendResponse(
Expand All @@ -53,7 +54,7 @@ func TestSessionNotAvailableSingleMaster(t *testing.T) {
mock.WithHeader("x-ms-substatus", "1002"),
mock.WithStatusCode(404))

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestSessionNotAvailableMultiMaster(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

// Setting up responses for using all retries and failing
srv.AppendResponse(
Expand All @@ -146,7 +147,7 @@ func TestSessionNotAvailableMultiMaster(t *testing.T) {
mock.WithHeader("x-ms-substatus", "1002"),
mock.WithStatusCode(404))

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
Expand Down Expand Up @@ -238,7 +239,7 @@ func TestReadEndpointFailure(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

// Setting up responses for retrying twice
srv.AppendResponse(
Expand All @@ -250,7 +251,7 @@ func TestReadEndpointFailure(t *testing.T) {
srv.AppendResponse(
mock.WithStatusCode(200))

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")
_, err = container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil)
Expand Down Expand Up @@ -291,9 +292,9 @@ func TestWriteEndpointFailure(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down Expand Up @@ -354,9 +355,9 @@ func TestReadServiceUnavailable(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down Expand Up @@ -427,9 +428,9 @@ func TestWriteServiceUnavailable(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down Expand Up @@ -506,9 +507,9 @@ func TestDnsErrorRetry(t *testing.T) {
retryPolicy := &clientRetryPolicy{gem: gem}
verifier := clientRetryPolicyVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})
internalClient, _ := azcore.NewClient("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerRetry: []policy.Policy{&verifier, retryPolicy}}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
client := &Client{endpoint: srv.URL(), internal: internalClient, gem: gem}
db, _ := client.NewDatabase("database_id")
container, _ := db.NewContainer("container_id")

Expand Down
Loading