Skip to content

Commit

Permalink
Adding in track2 SDK support for CosmosDB Bindings (dapr#1876)
Browse files Browse the repository at this point in the history
* Adding in track2 SDK support for CosmosDB Bindings

Signed-off-by: Ryan Lettieri <[email protected]>

* Removing non needed ID check

Signed-off-by: Ryan Lettieri <[email protected]>

* Addressing container read operation

Signed-off-by: Ryan Lettieri <[email protected]>

* Updating linting

Signed-off-by: Ryan Lettieri <[email protected]>

Co-authored-by: Ryan Lettieri <[email protected]>
Co-authored-by: Dapr Bot <[email protected]>
Signed-off-by: Andrew Duss <[email protected]>
  • Loading branch information
3 people authored and Andrew Duss committed Aug 18, 2022
1 parent 6b822ca commit b0bdd5c
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 84 deletions.
125 changes: 41 additions & 84 deletions bindings/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"strings"
"time"

"github.com/a8m/documentdb"
backoff "github.com/cenkalti/backoff/v4"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/internal/authentication/azure"
Expand All @@ -30,8 +29,7 @@ import (

// CosmosDB allows performing state operations on collections.
type CosmosDB struct {
client *documentdb.DocumentDB
collection string
client *azcosmos.ContainerClient
partitionKey string

logger logger.Logger
Expand All @@ -45,7 +43,8 @@ type cosmosDBCredentials struct {
PartitionKey string `json:"partitionKey"`
}

const statusTooManyRequests = "429" // RFC 6585, 4
// Value used for timeout durations
const timeoutValue = 30

// NewCosmosDB returns a new CosmosDB instance.
func NewCosmosDB(logger logger.Logger) *CosmosDB {
Expand All @@ -62,57 +61,43 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error {
c.partitionKey = m.PartitionKey

// Create the client; first, try authenticating with a master key, if present
var config *documentdb.Config
var client *azcosmos.Client
if m.MasterKey != "" {
config = documentdb.NewConfig(&documentdb.Key{
Key: m.MasterKey,
})
cred, keyErr := azcosmos.NewKeyCredential(m.MasterKey)
if keyErr != nil {
return keyErr
}
client, err = azcosmos.NewClientWithKey(m.URL, cred, nil)
if err != nil {
return err
}
} else {
// Fallback to using Azure AD
env, errB := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties)
if errB != nil {
return errB
env, errEnv := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties)
if errEnv != nil {
return errEnv
}
spt, errB := env.GetServicePrincipalToken()
if errB != nil {
return errB
token, errToken := env.GetTokenCredential()
if errToken != nil {
return errToken
}
config = documentdb.NewConfigWithServicePrincipal(spt)
}
// disable the identification hydrator (which autogenerates IDs if missing from the request)
// so we aren't forced to use a struct by the upstream SDK
// this allows us to provide the most flexibility in the request document sent to this binding
config.IdentificationHydrator = nil
config.WithAppIdentifier("dapr-" + logger.DaprVersion)

c.client = documentdb.New(m.URL, config)

// Retries initializing the client if a TooManyRequests error is encountered
err = retryOperation(func() (err error) {
collLink := fmt.Sprintf("dbs/%s/colls/%s/", m.Database, m.Collection)
coll, err := c.client.ReadCollection(collLink)
client, err = azcosmos.NewClient(m.URL, token, nil)
if err != nil {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
} else if coll == nil || coll.Self == "" {
return backoff.Permanent(
fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection, m.Database),
)
return err
}
}

c.collection = coll.Self

return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d)
}, 5*time.Minute)
// Create a container client
dbContainer, err := client.NewContainer(m.Database, m.Collection)
if err != nil {
return err
}

return nil
c.client = dbContainer
ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second)
_, err = c.client.Read(ctx, nil)
cancel()
return err
}

func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentials, error) {
Expand All @@ -135,7 +120,7 @@ func (c *CosmosDB) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}

func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (c *CosmosDB) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
var obj interface{}
Expand All @@ -144,41 +129,34 @@ func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bind
return nil, err
}

val, err := c.getPartitionKeyValue(c.partitionKey, obj)
pkString, err := c.getPartitionKeyValue(c.partitionKey, obj)
if err != nil {
return nil, err
}
pk := azcosmos.NewPartitionKeyString(pkString)

err = retryOperation(func() error {
_, innerErr := c.client.CreateDocument(c.collection, obj, documentdb.PartitionKey(val))
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB binding Invoke request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
_, err = c.client.CreateItem(ctx, pk, req.Data, nil)
if err != nil {
return nil, err
}

return nil, nil
default:
return nil, fmt.Errorf("operation kind %s not supported", req.Operation)
}
}

func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {
val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (string, error) {
valI, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
if err != nil {
return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
return "", fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
}
val, ok := valI.(string)
if !ok {
return "", fmt.Errorf("partition key is not a string")
}

if val == "" {
return nil, fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
return "", fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
}

return val, nil
Expand Down Expand Up @@ -209,24 +187,3 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{

return c.lookup(m, ks[1:])
}

func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = maxElapsedTime
return backoff.RetryNotify(operation, bo, notify)
}

func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}

if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}

return false
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0 h1:h/72OERa/5hgnKEOyQJ8gtJoTVX3uwHCavsraGadTZM=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
Expand Down
2 changes: 2 additions & 0 deletions tests/certification/bindings/azure/cosmosdb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ require (
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
Expand Down
4 changes: 4 additions & 0 deletions tests/certification/bindings/azure/cosmosdb/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
Expand Down

0 comments on commit b0bdd5c

Please sign in to comment.