From 35929185e85f11eb33710592d4744d6616a88227 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Thu, 17 Aug 2023 19:03:48 -0700 Subject: [PATCH 1/3] feat(datastore): Support aggregation query in transaction --- datastore/integration_test.go | 44 ++++++++++++++++++++++++++++------- datastore/query.go | 1 - datastore/query_test.go | 5 ---- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/datastore/integration_test.go b/datastore/integration_test.go index 5c873e16c2d5..e52f0d41d659 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -718,6 +718,7 @@ func TestIntegration_AggregationQueries(t *testing.T) { for i := range keys { keys[i] = IncompleteKey("SQChild", parent) } + beforeCreate := time.Now() keys, err := client.PutMulti(ctx, keys, children) if err != nil { t.Fatalf("client.PutMulti: %v", err) @@ -729,30 +730,57 @@ func TestIntegration_AggregationQueries(t *testing.T) { } }() - baseQuery := NewQuery("SQChild").Ancestor(parent) + // Create transaction with read before create + txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(beforeCreate)}...) + if err != nil { + t.Fatalf("client.NewTransaction: %v", err) + } + + // Create transaction with read after create + txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(time.Now())}...) + if err != nil { + t.Fatalf("client.NewTransaction: %v", err) + } + testCases := []struct { - desc string - aggQuery *AggregationQuery - wantFailure bool - wantErrMsg string - wantAggResult AggregationResult + desc string + aggQuery *AggregationQuery + transactionOpts []TransactionOption + wantFailure bool + wantErrMsg string + wantAggResult AggregationResult }{ + { desc: "Count Failure - Missing index", - aggQuery: baseQuery.Filter("T>=", now).NewAggregationQuery().WithCount("count"), + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T>=", now).NewAggregationQuery().WithCount("count"), wantFailure: true, wantErrMsg: "no matching index found", wantAggResult: nil, }, { desc: "Count Success", - aggQuery: baseQuery.Filter("T=", now).Filter("I>=", 3).NewAggregationQuery().WithCount("count"), + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3).NewAggregationQuery().WithCount("count"), wantFailure: false, wantErrMsg: "", wantAggResult: map[string]interface{}{ "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}, }, }, + { + desc: "Count in transaction before creating entities", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Transaction(txBeforeCreate).NewAggregationQuery().WithCount("count"), + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, + }, + }, + { + desc: "Count in transaction after creating entities", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Transaction(txAfterCreate).NewAggregationQuery().WithCount("count"), + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 8}}, + }, + }, } for _, testCase := range testCases { diff --git a/datastore/query.go b/datastore/query.go index 702dce38a873..292611e4b679 100644 --- a/datastore/query.go +++ b/datastore/query.go @@ -1026,7 +1026,6 @@ func DecodeCursor(s string) (Cursor, error) { // NewAggregationQuery returns an AggregationQuery with this query as its // base query. func (q *Query) NewAggregationQuery() *AggregationQuery { - q.eventual = true return &AggregationQuery{ query: q, aggregationQueries: make([]*pb.AggregationQuery_Aggregation, 0), diff --git a/datastore/query_test.go b/datastore/query_test.go index 22d13e1776ab..40fc8f327795 100644 --- a/datastore/query_test.go +++ b/datastore/query_test.go @@ -126,11 +126,6 @@ func fakeRunAggregationQuery(req *pb.RunAggregationQueryRequest) (*pb.RunAggrega }, }, }, - ReadOptions: &pb.ReadOptions{ - ConsistencyType: &pb.ReadOptions_ReadConsistency_{ - ReadConsistency: pb.ReadOptions_EVENTUAL, - }, - }, } if !proto.Equal(req, expectedIn) { return nil, fmt.Errorf("unsupported argument: got %v want %v", req, expectedIn) From 790680c6b23b09035dab5609eac1e687564f9810 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Mon, 21 Aug 2023 16:35:17 -0700 Subject: [PATCH 2/3] feat(datastore): Refactoring integration test --- datastore/integration_test.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/datastore/integration_test.go b/datastore/integration_test.go index e52f0d41d659..d2aa42271d70 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -718,8 +718,15 @@ func TestIntegration_AggregationQueries(t *testing.T) { for i := range keys { keys[i] = IncompleteKey("SQChild", parent) } - beforeCreate := time.Now() - keys, err := client.PutMulti(ctx, keys, children) + + // Create transaction with read before creating entities + readTime := time.Now() + txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...) + if err != nil { + t.Fatalf("client.NewTransaction: %v", err) + } + + keys, err = client.PutMulti(ctx, keys, children) if err != nil { t.Fatalf("client.PutMulti: %v", err) } @@ -730,14 +737,9 @@ func TestIntegration_AggregationQueries(t *testing.T) { } }() - // Create transaction with read before create - txBeforeCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(beforeCreate)}...) - if err != nil { - t.Fatalf("client.NewTransaction: %v", err) - } - - // Create transaction with read after create - txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(time.Now())}...) + // Create transaction with read after creating entities + readTime = time.Now() + txAfterCreate, err := client.NewTransaction(ctx, []TransactionOption{ReadOnly, WithReadTime(readTime)}...) if err != nil { t.Fatalf("client.NewTransaction: %v", err) } From 6b60e78096b205aec93d8f030b2b7b4e0c92b38b Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Tue, 22 Aug 2023 10:53:56 -0700 Subject: [PATCH 3/3] feat(datastore): Integration tests for sum and average --- datastore/integration_test.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/datastore/integration_test.go b/datastore/integration_test.go index 7baf0732d4a9..3dfd082c67f0 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -777,32 +777,32 @@ func TestIntegration_AggregationQueries(t *testing.T) { { desc: "Aggregations in transaction before creating entities", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). - Transaction(txBeforeCreate). - NewAggregationQuery(). - WithCount("count"). - WithSum("I", "sum"). - WithAvg("I", "avg"), + Transaction(txBeforeCreate). + NewAggregationQuery(). + WithCount("count"). + WithSum("I", "sum"). + WithAvg("I", "avg"), wantAggResult: map[string]interface{}{ "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, - "sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, - "avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 0}}, + "sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, + "avg": &pb.Value{ValueType: &pb.Value_NullValue{NullValue: structpb.NullValue_NULL_VALUE}}, }, }, { desc: "Aggregations in transaction after creating entities", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). - Transaction(txAfterCreate). - NewAggregationQuery(). - WithCount("count"). - WithSum("I", "sum"). - WithAvg("I", "avg"), + Transaction(txAfterCreate). + NewAggregationQuery(). + WithCount("count"). + WithSum("I", "sum"). + WithAvg("I", "avg"), wantAggResult: map[string]interface{}{ "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 8}}, - "sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 28}}, + "sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 28}}, "avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 3.5}}, }, }, - { + { desc: "Multiple aggregations", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). NewAggregationQuery().