From 3ca593ccd3c44bbe45866fdc5af4fb394ad3234f Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Tue, 12 Jul 2022 14:22:31 -0600 Subject: [PATCH 1/4] Adding in track2 SDK support for CosmosDB Bindings Signed-off-by: Ryan Lettieri --- bindings/azure/cosmosdb/cosmosdb.go | 133 +++++++----------- go.mod | 1 + go.sum | 2 + .../bindings/azure/cosmosdb/go.mod | 2 + .../bindings/azure/cosmosdb/go.sum | 4 + 5 files changed, 59 insertions(+), 83 deletions(-) diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index 22fa89a711..f22f6f213d 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -17,11 +17,11 @@ import ( "context" "encoding/json" "fmt" + "net/http" "strings" "time" - "github.com/a8m/documentdb" - backoff "github.com/cenkalti/backoff/v4" + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/internal/authentication/azure" @@ -30,8 +30,7 @@ import ( // CosmosDB allows performing state operations on collections. type CosmosDB struct { - client *documentdb.DocumentDB - collection string + client *azcosmos.ContainerClient partitionKey string logger logger.Logger @@ -45,7 +44,8 @@ type cosmosDBCredentials struct { PartitionKey string `json:"partitionKey"` } -const statusTooManyRequests = "429" // RFC 6585, 4 +// Value used for timeout durations +const timeoutValue = 30 // NewCosmosDB returns a new CosmosDB instance. func NewCosmosDB(logger logger.Logger) *CosmosDB { @@ -62,56 +62,51 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error { c.partitionKey = m.PartitionKey // Create the client; first, try authenticating with a master key, if present - var config *documentdb.Config + var client *azcosmos.Client if m.MasterKey != "" { - config = documentdb.NewConfig(&documentdb.Key{ - Key: m.MasterKey, - }) + cred, keyErr := azcosmos.NewKeyCredential(m.MasterKey) + if keyErr != nil { + return keyErr + } + client, err = azcosmos.NewClientWithKey(m.URL, cred, nil) + if err != nil { + return err + } } else { // Fallback to using Azure AD - env, errB := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties) - if errB != nil { - return errB + env, errEnv := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties) + if errEnv != nil { + return errEnv } - spt, errB := env.GetServicePrincipalToken() - if errB != nil { - return errB + token, errToken := env.GetTokenCredential() + if errToken != nil { + return errToken } - config = documentdb.NewConfigWithServicePrincipal(spt) - } - // disable the identification hydrator (which autogenerates IDs if missing from the request) - // so we aren't forced to use a struct by the upstream SDK - // this allows us to provide the most flexibility in the request document sent to this binding - config.IdentificationHydrator = nil - config.WithAppIdentifier("dapr-" + logger.DaprVersion) - - c.client = documentdb.New(m.URL, config) - - // Retries initializing the client if a TooManyRequests error is encountered - err = retryOperation(func() (err error) { - collLink := fmt.Sprintf("dbs/%s/colls/%s/", m.Database, m.Collection) - coll, err := c.client.ReadCollection(collLink) + client, err = azcosmos.NewClient(m.URL, token, nil) if err != nil { - if isTooManyRequestsError(err) { - return err - } - return backoff.Permanent(err) - } else if coll == nil || coll.Self == "" { - return backoff.Permanent( - fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection, m.Database), - ) + return err } + } - c.collection = coll.Self - - return nil - }, func(err error, d time.Duration) { - c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d) - }, 5*time.Minute) + // Create a container client + dbContainer, err := client.NewContainer(m.Database, m.Collection) + if err != nil { + return err + } + if dbContainer.ID() == "" { + return fmt.Errorf("container is non-existent") + } + c.client = dbContainer + ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second) + containerRead, err := c.client.Read(ctx, nil) + cancel() if err != nil { return err } + if containerRead.RawResponse.StatusCode == http.StatusNotFound { + return fmt.Errorf("error when attempting to read client with error code: %v", containerRead.RawResponse.StatusCode) + } return nil } @@ -135,7 +130,7 @@ func (c *CosmosDB) Operations() []bindings.OperationKind { return []bindings.OperationKind{bindings.CreateOperation} } -func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (c *CosmosDB) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case bindings.CreateOperation: var obj interface{} @@ -144,41 +139,34 @@ func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bind return nil, err } - val, err := c.getPartitionKeyValue(c.partitionKey, obj) + pkString, err := c.getPartitionKeyValue(c.partitionKey, obj) if err != nil { return nil, err } + pk := azcosmos.NewPartitionKeyString(pkString) - err = retryOperation(func() error { - _, innerErr := c.client.CreateDocument(c.collection, obj, documentdb.PartitionKey(val)) - if innerErr != nil { - if isTooManyRequestsError(innerErr) { - return innerErr - } - return backoff.Permanent(innerErr) - } - return nil - }, func(err error, d time.Duration) { - c.logger.Warnf("CosmosDB binding Invoke request failed: %v; retrying in %s", err, d) - }, 20*time.Second) + _, err = c.client.CreateItem(ctx, pk, req.Data, nil) if err != nil { return nil, err } - return nil, nil default: return nil, fmt.Errorf("operation kind %s not supported", req.Operation) } } -func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) { - val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, ".")) +func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (string, error) { + valI, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, ".")) if err != nil { - return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err) + return "", fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err) + } + val, ok := valI.(string) + if !ok { + return "", fmt.Errorf("partition key is not a string") } if val == "" { - return nil, fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey) + return "", fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey) } return val, nil @@ -209,24 +197,3 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{ return c.lookup(m, ks[1:]) } - -func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error { - bo := backoff.NewExponentialBackOff() - bo.InitialInterval = 2 * time.Second - bo.MaxElapsedTime = maxElapsedTime - return backoff.RetryNotify(operation, bo, notify) -} - -func isTooManyRequestsError(err error) bool { - if err == nil { - return false - } - - if requestError, ok := err.(*documentdb.RequestError); ok { - if requestError.Code == statusTooManyRequests { - return true - } - } - - return false -} diff --git a/go.mod b/go.mod index 8693956acc..c0a9316790 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/Azure/azure-sdk-for-go v65.0.0+incompatible github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 + github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 diff --git a/go.sum b/go.sum index 18adbeee2b..236773348c 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0 h1:h/72OERa/5hgnKEOyQJ8gtJoTVX3uwHCavsraGadTZM= github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= diff --git a/tests/certification/bindings/azure/cosmosdb/go.mod b/tests/certification/bindings/azure/cosmosdb/go.mod index b362ab2ed1..6d6d59befb 100644 --- a/tests/certification/bindings/azure/cosmosdb/go.mod +++ b/tests/certification/bindings/azure/cosmosdb/go.mod @@ -19,8 +19,10 @@ require ( github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect + github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect diff --git a/tests/certification/bindings/azure/cosmosdb/go.sum b/tests/certification/bindings/azure/cosmosdb/go.sum index 9b2f7f8816..09592a1f59 100644 --- a/tests/certification/bindings/azure/cosmosdb/go.sum +++ b/tests/certification/bindings/azure/cosmosdb/go.sum @@ -49,10 +49,14 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs= From 9c63eb50985c48e38786497dcee022659d0f281d Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Mon, 8 Aug 2022 16:58:52 -0600 Subject: [PATCH 2/4] Removing non needed ID check Signed-off-by: Ryan Lettieri --- bindings/azure/cosmosdb/cosmosdb.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index f22f6f213d..a30e786d76 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -93,9 +93,7 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error { if err != nil { return err } - if dbContainer.ID() == "" { - return fmt.Errorf("container is non-existent") - } + c.client = dbContainer ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second) containerRead, err := c.client.Read(ctx, nil) From d048b2c5e59da8f0e9f41e61442454cfffdb2a54 Mon Sep 17 00:00:00 2001 From: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> Date: Tue, 9 Aug 2022 13:06:51 -0600 Subject: [PATCH 3/4] Addressing container read operation Signed-off-by: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> --- bindings/azure/cosmosdb/cosmosdb.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index a30e786d76..19b016c045 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -96,16 +96,9 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error { c.client = dbContainer ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second) - containerRead, err := c.client.Read(ctx, nil) + _, err := c.client.Read(ctx, nil) cancel() - if err != nil { - return err - } - - if containerRead.RawResponse.StatusCode == http.StatusNotFound { - return fmt.Errorf("error when attempting to read client with error code: %v", containerRead.RawResponse.StatusCode) - } - return nil + return err } func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentials, error) { From 61cb169551c94455752478557331931023be8421 Mon Sep 17 00:00:00 2001 From: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> Date: Tue, 9 Aug 2022 13:13:04 -0600 Subject: [PATCH 4/4] Updating linting Signed-off-by: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> --- bindings/azure/cosmosdb/cosmosdb.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index 19b016c045..3b3abad0d4 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -17,7 +17,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "strings" "time" @@ -96,7 +95,7 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error { c.client = dbContainer ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second) - _, err := c.client.Read(ctx, nil) + _, err = c.client.Read(ctx, nil) cancel() return err }