Skip to content

Commit

Permalink
Merge pull request #48 from xataio/support-for-sub-aggs
Browse files Browse the repository at this point in the history
Support for sub aggs
  • Loading branch information
philkra authored Dec 22, 2023
2 parents 1632385 + 5250749 commit 6600529
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 24 deletions.
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ smoke-test: ## smoke tests
@cd internal/smoke-tests && go run . && cd ../..

test: ## run unit tests
@echo "Running unit tests"
@go test -v -count=1 -cover -race ./xata
TEST_DIRECTORY=./xata go run gotest.tools/gotestsum@latest --format testname

integration-test: ## run integration tests
@echo "Running integration test"
@go test -v -count=1 -cover -race ./internal/integration-tests
TEST_DIRECTORY=./internal/integration-tests go run gotest.tools/gotestsum@latest --format testname
$(MAKE) clean-workspaces

download-openapi-specs: ## download openapi specs
Expand Down
90 changes: 90 additions & 0 deletions internal/integration-tests/search_filter_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,36 @@ func Test_searchAndFilterClient(t *testing.T) {
}, time.Second*10, time.Second)
})

t.Run("aggregate table date histogram with nested aggregations", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String(cfg.databaseName),
},
TableName: cfg.tableName,
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"histogram": xata.NewDateHistogramAggExpression(xata.DateHistogramAgg{
Column: dateTimeColumn,
Interval: xata.String("1d"),
CalendarInterval: nil,
Timezone: nil,
Aggs: &xata.NestedAggsMap{
"max": xata.NewMaxAggExpression(integerColumn),
"avg": xata.NewAverageAggExpression(integerColumn),
},
}),
},
},
})
assert.NoError(t, err)
if (*aggTableRes.Aggs)["histogram"] != nil && (*aggTableRes.Aggs)["histogram"].AggResponseValues != nil {
return len((*aggTableRes.Aggs)["histogram"].AggResponseValues.Values.AggResponseValuesValuesItemList) > 0
}
return false
}, time.Second*10, time.Second)
})

t.Run("aggregate table top values", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
Expand All @@ -619,6 +649,34 @@ func Test_searchAndFilterClient(t *testing.T) {
}, time.Second*10, time.Second)
})

t.Run("aggregate table top values with nested aggs", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String(cfg.databaseName),
},
TableName: cfg.tableName,
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"top_values": xata.NewTopValuesAggExpression(xata.TopValuesAgg{
Column: stringColumn,
Size: nil,
Aggs: &xata.NestedAggsMap{
"max": xata.NewMaxAggExpression(integerColumn),
"avg": xata.NewAverageAggExpression(integerColumn),
},
}),
},
},
})
assert.NoError(t, err)
if (*aggTableRes.Aggs)["top_values"] != nil && (*aggTableRes.Aggs)["top_values"].AggResponseValues != nil {
return len((*aggTableRes.Aggs)["top_values"].AggResponseValues.Values.AggResponseValuesValuesItemList) > 0
}
return false
}, time.Second*10, time.Second)
})

t.Run("aggregate table numeric histogram", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
Expand All @@ -643,4 +701,36 @@ func Test_searchAndFilterClient(t *testing.T) {
return false
}, time.Second*10, time.Second)
})

t.Run("aggregate table numeric histogram with nested aggregations", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String(cfg.databaseName),
},
TableName: cfg.tableName,
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"num_histogram": xata.NewNumericHistogramAggExpression(xata.NumericHistogramAgg{
Column: integerColumn,
Interval: 1.0,
Offset: nil,
Aggs: &xata.NestedAggsMap{
"max": xata.NewMaxAggExpression(integerColumn),
"top_values": xata.NewTopValuesAggExpression(xata.TopValuesAgg{
Column: stringColumn,
Size: nil,
}),
},
}),
},
},
})
assert.NoError(t, err)
if (*aggTableRes.Aggs)["num_histogram"] != nil && (*aggTableRes.Aggs)["num_histogram"].AggResponseValues != nil {
return len((*aggTableRes.Aggs)["num_histogram"].AggResponseValues.Values.AggResponseValuesValuesItemList) > 0
}
return false
}, time.Second*10, time.Second)
})
}
29 changes: 10 additions & 19 deletions internal/integration-tests/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,21 @@ func setupDatabase() (*config, error) {
if !found {
return nil, fmt.Errorf("%s not found in env vars", "XATA_API_KEY")
}
// require workspace ID to come from the env var
// instead of creating new workspace on each client
wsID, found := os.LookupEnv("XATA_WORKSPACE_ID")
if !found {
return nil, fmt.Errorf("%s not found in env vars", "XATA_WORKSPACE_ID")
}

testID := testIdentifier()

cfg := &config{
apiKey: apiKey,
testID: testID,
wsID: wsID,
httpCli: retryablehttp.NewClient().StandardClient(),
}

workspaceCli, err := xata.NewWorkspacesClient(
xata.WithAPIKey(cfg.apiKey),
xata.WithHTTPClient(cfg.httpCli),
)
if err != nil {
return nil, err
}

ws, err := workspaceCli.Create(ctx, &xata.WorkspaceMeta{Name: "ws" + testID})
if err != nil {
return nil, err
}

cfg.wsID = ws.Id

databaseCli, err := xata.NewDatabasesClient(
xata.WithAPIKey(cfg.apiKey),
xata.WithHTTPClient(cfg.httpCli),
Expand All @@ -63,7 +54,7 @@ func setupDatabase() (*config, error) {
return nil, err
}

listRegionsResponse, err := databaseCli.GetRegionsWithWorkspaceID(ctx, ws.Id)
listRegionsResponse, err := databaseCli.GetRegionsWithWorkspaceID(ctx, cfg.wsID)
if err != nil {
return nil, err
}
Expand All @@ -77,8 +68,8 @@ func setupDatabase() (*config, error) {
)

db, err := databaseCli.Create(ctx, xata.CreateDatabaseRequest{
DatabaseName: "db" + testID,
WorkspaceID: xata.String(ws.Id),
DatabaseName: "db" + cfg.testID,
WorkspaceID: xata.String(cfg.wsID),
Region: &cfg.region,
UI: &xata.UI{Color: xata.String("RED")},
BranchMetaData: &xata.BranchMetadata{
Expand Down
85 changes: 84 additions & 1 deletion xata/search_filter_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ func Test_searchAndFilterCli_Summarize(t *testing.T) {
}
}

func Test_searchAndFilterCli_Aggregate(t *testing.T) {
// Agg: Date Histogram
func Test_searchAndFilterCli_Aggregate_DateHistogram(t *testing.T) {
assert := assert.New(t)

type tc struct {
Expand Down Expand Up @@ -779,3 +780,85 @@ func Test_searchAndFilterCli_Aggregate(t *testing.T) {
})
}
}

// Agg: TopValues
// with nested aggregations
// https://github.com/xataio/xata-go/issues/47
func Test_searchAndFilterCli_Aggregate_TopValues_With_NestedAggs(t *testing.T) {
assert := assert.New(t)

type tc struct {
name string
want *xatagenworkspace.AggregateTableResponse
statusCode int
apiErr *xatagencore.APIError
}

aggRes := map[string]*xatagenworkspace.AggResponse{
"test": xatagenworkspace.NewAggResponseFromDoubleOptional(xata.Float64(2)),
}

tests := []tc{
{
name: "should aggregate a table with nested aggregation",
want: &xatagenworkspace.AggregateTableResponse{Aggs: &aggRes},
statusCode: http.StatusOK,
},
}

for _, eTC := range errTestCasesWorkspace {
tests = append(tests, tc{
name: eTC.name,
statusCode: eTC.statusCode,
apiErr: eTC.apiErr,
})
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testSrv := testService(t, http.MethodPost, "/db", tt.statusCode, tt.apiErr != nil, tt.want)

cli, err := xata.NewSearchAndFilterClient(
xata.WithBaseURL(testSrv.URL),
xata.WithAPIKey("test-key"),
)
assert.NoError(err)
assert.NotNil(cli)

got, err := cli.Aggregate(
context.TODO(),
xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String("my-db"),
},
TableName: "my-table",
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"topKey": xata.NewTopValuesAggExpression(xata.TopValuesAgg{
Column: "my-column",
Size: xata.Int(5),
Aggs: &xata.NestedAggsMap{
"nestedKeyOne": xata.NewAverageAggExpression("my-sub-col_a"),
"nestedKeyTwo": xata.NewMaxAggExpression("my-sub-col_b"),
},
}),
},
},
},
)

if tt.apiErr != nil {
errAPI := tt.apiErr.Unwrap()
if errAPI == nil {
t.Fatal("expected error but got nil")
}
assert.ErrorAs(err, &errAPI)
assert.Equal(err.Error(), tt.apiErr.Error())
assert.Nil(got)
} else {
assert.Equal(tt.want, got)
assert.NoError(err)
}
})
}
}
11 changes: 11 additions & 0 deletions xata/search_filter_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ type AggExpression *xatagenworkspace.AggExpression

type AggExpressionMap = map[string]AggExpression

type NestedAggsMap = map[string]*xatagenworkspace.AggExpression

type AggExpressionCount xatagenworkspace.AggExpressionCount

type CountAggFilter struct {
Expand Down Expand Up @@ -298,6 +300,8 @@ const (

// Split data into buckets by a datetime column. Accepts sub-aggregations for each bucket.
type DateHistogramAgg struct {
// Nested Aggregations
Aggs *NestedAggsMap `json:"aggs,omitempty"`
// The column to use for bucketing. Must be of type datetime.
Column string
// The fixed interval to use when bucketing.
Expand All @@ -315,6 +319,7 @@ type DateHistogramAgg struct {
func NewDateHistogramAggExpression(value DateHistogramAgg) *xatagenworkspace.AggExpression {
return xatagenworkspace.NewAggExpressionFromAggExpressionDateHistogram(&xatagenworkspace.AggExpressionDateHistogram{
DateHistogram: &xatagenworkspace.DateHistogramAgg{
Aggs: value.Aggs,
Column: value.Column,
Interval: value.Interval,
CalendarInterval: (*xatagenworkspace.DateHistogramAggCalendarInterval)(value.CalendarInterval),
Expand All @@ -326,6 +331,8 @@ func NewDateHistogramAggExpression(value DateHistogramAgg) *xatagenworkspace.Agg
// Split data into buckets by the unique values in a column. Accepts sub-aggregations for each bucket.
// The top values as ordered by the number of records (`$count`) are returned.
type TopValuesAgg struct {
// Nested Aggregations
Aggs *NestedAggsMap `json:"aggs,omitempty"`
// The column to use for bucketing. Accepted types are `string`, `email`, `int`, `float`, or `bool`.
Column string
// The maximum number of unique values to return.
Expand All @@ -337,12 +344,15 @@ func NewTopValuesAggExpression(value TopValuesAgg) *xatagenworkspace.AggExpressi
TopValues: &xatagenworkspace.TopValuesAgg{
Column: value.Column,
Size: value.Size,
Aggs: value.Aggs,
},
})
}

// Split data into buckets by dynamic numeric ranges. Accepts sub-aggregations for each bucket.
type NumericHistogramAgg struct {
// Nested Aggregations
Aggs *NestedAggsMap `json:"aggs,omitempty"`
// The column to use for bucketing. Must be of numeric type.
Column string
// The numeric interval to use for bucketing. The resulting buckets will be ranges
Expand All @@ -357,6 +367,7 @@ type NumericHistogramAgg struct {

func NewNumericHistogramAggExpression(value NumericHistogramAgg) *xatagenworkspace.AggExpression {
return xatagenworkspace.NewAggExpressionFromAggExpressionNumericHistogram(&xatagenworkspace.AggExpressionNumericHistogram{NumericHistogram: &xatagenworkspace.NumericHistogramAgg{
Aggs: value.Aggs,
Column: value.Column,
Offset: value.Offset,
Interval: value.Interval,
Expand Down
1 change: 1 addition & 0 deletions xata/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
personalAPIKeyLocation = "~/.config/xata/key"
defaultControlPlaneDomain = "api.xata.io"
xataAPIKeyEnvVar = "XATA_API_KEY"
xataWsIDEnvVar = "XATA_WORKSPACE_ID" // TODO: not in use yet
dbURLFormat = "https://{workspace_id}.{region}.xata.sh/db/{db_name}:{branch_name}"
defaultBranchName = "main"
configFileName = ".xatarc"
Expand Down

0 comments on commit 6600529

Please sign in to comment.