From 120a649caa2178f7d983af09b4d542753129fa61 Mon Sep 17 00:00:00 2001 From: luigirende Date: Thu, 25 Jan 2024 01:49:33 +0100 Subject: [PATCH] CosmosDBState - Add, as option, the partitionKey in QueryMethod (#3227) Signed-off-by: luigirende Signed-off-by: Luigi Rende Signed-off-by: Bernd Verst Co-authored-by: Bernd Verst --- state/azure/cosmosdb/cosmosdb.go | 14 +++- state/azure/cosmosdb/cosmosdb_query.go | 16 +++-- state/feature.go | 2 + tests/conformance/state/state.go | 97 +++++++++++++++++++++++--- 4 files changed, 115 insertions(+), 14 deletions(-) diff --git a/state/azure/cosmosdb/cosmosdb.go b/state/azure/cosmosdb/cosmosdb.go index 4b415d3ce9..e17f3bd373 100644 --- a/state/azure/cosmosdb/cosmosdb.go +++ b/state/azure/cosmosdb/cosmosdb.go @@ -92,8 +92,11 @@ func (p crossPartitionQueryPolicy) Do(req *policy.Request) (*http.Response, erro // Check if we're performing a query // In that case, remove the partitionkey header and enable cross-partition queries if strings.ToLower(raw.Header.Get("x-ms-documentdb-query")) == "true" { - raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true") - raw.Header.Del("x-ms-documentdb-partitionkey") + // Only when the partitionKey is fake (true), it will be removed amd enabled the cross partition + if strings.ToLower(raw.Header.Get("x-ms-documentdb-partitionkey")) == "[true]" { + raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true") + raw.Header.Del("x-ms-documentdb-partitionkey") + } } return req.Next() } @@ -206,6 +209,7 @@ func (c *StateStore) Features() []state.Feature { state.FeatureTransactional, state.FeatureQueryAPI, state.FeatureTTL, + state.FeaturePartitionKey, } } @@ -594,6 +598,12 @@ func (c *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state return &state.QueryResponse{}, err } + // If present partitionKey, the value will be used in the query disabling the cross partition + q.partitionKey = "" + if val, found := req.Metadata[metadataPartitionKey]; found { + q.partitionKey = val + } + data, token, err := q.execute(ctx, c.client) if err != nil { return nil, err diff --git a/state/azure/cosmosdb/cosmosdb_query.go b/state/azure/cosmosdb/cosmosdb_query.go index fa8ed11d52..72d17ea71c 100644 --- a/state/azure/cosmosdb/cosmosdb_query.go +++ b/state/azure/cosmosdb/cosmosdb_query.go @@ -34,9 +34,10 @@ type InternalQuery struct { } type Query struct { - query InternalQuery - limit int - token string + query InternalQuery + limit int + token string + partitionKey string } func (q *Query) VisitEQ(f *query.EQ) (string, error) { @@ -264,7 +265,14 @@ func (q *Query) execute(ctx context.Context, client *azcosmos.ContainerClient) ( } items := []CosmosItem{} - pk := azcosmos.NewPartitionKeyBool(true) + + var pk azcosmos.PartitionKey + if q.partitionKey != "" { + pk = azcosmos.NewPartitionKeyString(q.partitionKey) + } else { + pk = azcosmos.NewPartitionKeyBool(true) + } + queryPager := client.NewQueryItemsPager(q.query.query, pk, opts) token := "" diff --git a/state/feature.go b/state/feature.go index 5985dff612..fb346e2113 100644 --- a/state/feature.go +++ b/state/feature.go @@ -28,6 +28,8 @@ const ( FeatureTTL Feature = "TTL" // FeatureDeleteWithPrefix is the feature that supports deleting with prefix. FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX" + // FeaturePartitionKey is the feature that supports the partition + FeaturePartitionKey Feature = "PARTITION_KEY" ) // Feature names a feature that can be implemented by state store components. diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index fab6693d2d..ef9dd51828 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -44,7 +44,8 @@ type StructType struct { Product struct { Value int `json:"value"` } `json:"product"` - Status string `json:"status"` + Status string `json:"status"` + Message string `json:"message"` } type intValueType struct { @@ -62,8 +63,10 @@ type scenario struct { } type queryScenario struct { - query string - results []state.QueryItem + query string + results []state.QueryItem + metadata map[string]string + partitionOnly bool } type TestConfig struct { @@ -130,14 +133,19 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St key: fmt.Sprintf("%s-struct-operations", key), value: StructType{Product: struct { Value int `json:"value"` - }{Value: 15}, Status: "ACTIVE"}, + }{Value: 15}, Status: "ACTIVE", Message: fmt.Sprintf("%smessage", key)}, contentType: contenttype.JSONContentType, }, { key: fmt.Sprintf("%s-struct-operations-inactive", key), value: StructType{Product: struct { Value int `json:"value"` - }{Value: 12}, Status: "INACTIVE"}, + }{Value: 12}, Status: "INACTIVE", Message: fmt.Sprintf("%smessage", key)}, + contentType: contenttype.JSONContentType, + }, + { + key: fmt.Sprintf("%s-struct-2", key), + value: ValueType{Message: fmt.Sprintf("%stest", key)}, contentType: contenttype.JSONContentType, }, { @@ -261,6 +269,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St { "filter": { "AND": [ + { + "EQ": {"message": "` + key + `message"} + }, { "GTE": {"product.value": 10} }, @@ -277,7 +288,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St results: []state.QueryItem{ { Key: fmt.Sprintf("%s-struct-operations", key), - Data: []byte(fmt.Sprintf(`{"product":{"value":15}, "status":"ACTIVE"}`)), + Data: []byte(fmt.Sprintf(`{"product":{"value":15}, "status":"ACTIVE","message":"%smessage"}`, key)), }, }, }, @@ -288,6 +299,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St "OR": [ { "AND": [ + { + "EQ": {"message": "` + key + `message"} + }, { "GT": {"product.value": 11.1} }, @@ -298,6 +312,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St }, { "AND": [ + { + "EQ": {"message": "` + key + `message"} + }, { "LTE": {"product.value": 0.5} }, @@ -313,7 +330,65 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St results: []state.QueryItem{ { Key: fmt.Sprintf("%s-struct-operations-inactive", key), - Data: []byte(fmt.Sprintf(`{"product":{"value":12}, "status":"INACTIVE"}`)), + Data: []byte(fmt.Sprintf(`{"product":{"value":12}, "status":"INACTIVE","message":"%smessage"}`, key)), + }, + }, + }, + + // In CosmosDB this query uses the cross partition ( value with 2 different partitionKey -struct and -struct-2) + { + query: ` + { + "filter": { + "OR": [ + { + "EQ": {"message": "` + key + `test"} + }, + { + "IN": {"message": ["test` + key + `", "dummy"]} + } + ] + } + } + `, + // Return 2 item from 2 different partitionKey (-struct and -struct-2), for default the partitionKey is equals to key + results: []state.QueryItem{ + { + Key: fmt.Sprintf("%s-struct", key), + Data: []byte(fmt.Sprintf(`{"message":"test%s"}`, key)), + }, + { + Key: fmt.Sprintf("%s-struct-2", key), + Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)), + }, + }, + }, + + // Test for CosmosDB (filter test with partitionOnly) this query doesn't use the cross partition ( value from 2 different partitionKey %s-struct and %s-struct-2) + { + query: ` + { + "filter": { + "OR": [ + { + "EQ": {"message": "` + key + `test"} + }, + { + "IN": {"message": ["test` + key + `", "dummy"]} + } + ] + } + } + `, + metadata: map[string]string{ + "partitionKey": fmt.Sprintf("%s-struct-2", key), + }, + partitionOnly: true, + // The same query from previous test but return only item having the same partitionKey value (%s-struct-2) given in the metadata + results: []state.QueryItem{ + { + Key: fmt.Sprintf("%s-struct-2", key), + Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)), }, }, }, @@ -382,10 +457,12 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St // Check if query feature is listed features := statestore.Features() require.True(t, state.FeatureQueryAPI.IsPresent(features)) - querier, ok := statestore.(state.Querier) assert.True(t, ok, "Querier interface is not implemented") for _, scenario := range queryScenarios { + if (scenario.partitionOnly) && (!state.FeaturePartitionKey.IsPresent(features)) { + break + } t.Logf("Querying value presence for %s", scenario.query) var req state.QueryRequest err := json.Unmarshal([]byte(scenario.query), &req.Query) @@ -395,6 +472,10 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St metadata.QueryIndexName: "qIndx", } + if val, found := scenario.metadata["partitionKey"]; found { + req.Metadata["partitionKey"] = val + } + resp, err := querier.Query(context.Background(), &req) require.NoError(t, err) assert.Equal(t, len(scenario.results), len(resp.Results))