Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding in track2 SDK support for CosmosDB Bindings #1876

Merged
merged 5 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 41 additions & 84 deletions bindings/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"strings"
"time"

"github.com/a8m/documentdb"
backoff "github.com/cenkalti/backoff/v4"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/internal/authentication/azure"
Expand All @@ -30,8 +29,7 @@ import (

// CosmosDB allows performing state operations on collections.
type CosmosDB struct {
client *documentdb.DocumentDB
collection string
client *azcosmos.ContainerClient
partitionKey string

logger logger.Logger
Expand All @@ -45,7 +43,8 @@ type cosmosDBCredentials struct {
PartitionKey string `json:"partitionKey"`
}

const statusTooManyRequests = "429" // RFC 6585, 4
// Value used for timeout durations
const timeoutValue = 30

// NewCosmosDB returns a new CosmosDB instance.
func NewCosmosDB(logger logger.Logger) *CosmosDB {
Expand All @@ -62,57 +61,43 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error {
c.partitionKey = m.PartitionKey

// Create the client; first, try authenticating with a master key, if present
var config *documentdb.Config
var client *azcosmos.Client
if m.MasterKey != "" {
config = documentdb.NewConfig(&documentdb.Key{
Key: m.MasterKey,
})
cred, keyErr := azcosmos.NewKeyCredential(m.MasterKey)
if keyErr != nil {
return keyErr
}
client, err = azcosmos.NewClientWithKey(m.URL, cred, nil)
if err != nil {
return err
}
} else {
// Fallback to using Azure AD
env, errB := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties)
if errB != nil {
return errB
env, errEnv := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties)
if errEnv != nil {
return errEnv
}
spt, errB := env.GetServicePrincipalToken()
if errB != nil {
return errB
token, errToken := env.GetTokenCredential()
if errToken != nil {
return errToken
}
config = documentdb.NewConfigWithServicePrincipal(spt)
}
// disable the identification hydrator (which autogenerates IDs if missing from the request)
// so we aren't forced to use a struct by the upstream SDK
// this allows us to provide the most flexibility in the request document sent to this binding
config.IdentificationHydrator = nil
config.WithAppIdentifier("dapr-" + logger.DaprVersion)

c.client = documentdb.New(m.URL, config)

// Retries initializing the client if a TooManyRequests error is encountered
err = retryOperation(func() (err error) {
berndverst marked this conversation as resolved.
Show resolved Hide resolved
collLink := fmt.Sprintf("dbs/%s/colls/%s/", m.Database, m.Collection)
coll, err := c.client.ReadCollection(collLink)
client, err = azcosmos.NewClient(m.URL, token, nil)
if err != nil {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
} else if coll == nil || coll.Self == "" {
return backoff.Permanent(
fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection, m.Database),
)
return err
}
}

c.collection = coll.Self

return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d)
}, 5*time.Minute)
// Create a container client
dbContainer, err := client.NewContainer(m.Database, m.Collection)
if err != nil {
return err
}

return nil
c.client = dbContainer
ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second)
_, err = c.client.Read(ctx, nil)
cancel()
return err
}

func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentials, error) {
Expand All @@ -135,7 +120,7 @@ func (c *CosmosDB) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}

func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (c *CosmosDB) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
var obj interface{}
Expand All @@ -144,41 +129,34 @@ func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bind
return nil, err
}

val, err := c.getPartitionKeyValue(c.partitionKey, obj)
pkString, err := c.getPartitionKeyValue(c.partitionKey, obj)
if err != nil {
return nil, err
}
pk := azcosmos.NewPartitionKeyString(pkString)

err = retryOperation(func() error {
_, innerErr := c.client.CreateDocument(c.collection, obj, documentdb.PartitionKey(val))
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB binding Invoke request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
_, err = c.client.CreateItem(ctx, pk, req.Data, nil)
if err != nil {
return nil, err
}

return nil, nil
default:
return nil, fmt.Errorf("operation kind %s not supported", req.Operation)
}
}

func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {
val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (string, error) {
valI, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
if err != nil {
return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
return "", fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
}
val, ok := valI.(string)
if !ok {
return "", fmt.Errorf("partition key is not a string")
}

if val == "" {
return nil, fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
return "", fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
}

return val, nil
Expand Down Expand Up @@ -209,24 +187,3 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{

return c.lookup(m, ks[1:])
}

func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = maxElapsedTime
return backoff.RetryNotify(operation, bo, notify)
}

func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}

if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}

return false
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0 h1:h/72OERa/5hgnKEOyQJ8gtJoTVX3uwHCavsraGadTZM=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
Expand Down
2 changes: 2 additions & 0 deletions tests/certification/bindings/azure/cosmosdb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ require (
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
Expand Down
4 changes: 4 additions & 0 deletions tests/certification/bindings/azure/cosmosdb/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
Expand Down