diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md
index 96a5d297e50e..6ef9f9999df5 100644
--- a/docs/generated/http/full.md
+++ b/docs/generated/http/full.md
@@ -4098,6 +4098,8 @@ tenant pods.
| last_reset | [google.protobuf.Timestamp](#cockroach.server.serverpb.StatementsResponse-google.protobuf.Timestamp) | | Timestamp of the last stats reset. | [reserved](#support-status) |
| internal_app_name_prefix | [string](#cockroach.server.serverpb.StatementsResponse-string) | | If set and non-empty, indicates the prefix to application_name used for statements/queries issued internally by CockroachDB. | [reserved](#support-status) |
| transactions | [StatementsResponse.ExtendedCollectedTransactionStatistics](#cockroach.server.serverpb.StatementsResponse-cockroach.server.serverpb.StatementsResponse.ExtendedCollectedTransactionStatistics) | repeated | Transactions is transaction-level statistics for the collection of statements in this response. | [reserved](#support-status) |
+| stmts_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
+| txns_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
@@ -4170,12 +4172,28 @@ Support status: [reserved](#support-status)
| ----- | ---- | ----- | ----------- | -------------- |
| start | [int64](#cockroach.server.serverpb.CombinedStatementsStatsRequest-int64) | | Unix time range for aggregated statements. | [reserved](#support-status) |
| end | [int64](#cockroach.server.serverpb.CombinedStatementsStatsRequest-int64) | | | [reserved](#support-status) |
+| fetch_mode | [CombinedStatementsStatsRequest.FetchMode](#cockroach.server.serverpb.CombinedStatementsStatsRequest-cockroach.server.serverpb.CombinedStatementsStatsRequest.FetchMode) | | Note that if fetch_mode is set to transactions only, we will also include the statement statistics for the stmts in the transactions response. This is more of a hack-y method to get the complete stats for txns, because in the client we need to fill in some txn stats info from its stmt stats, such as the query string.
We prefer this hackier method right now to reduce surface area for backporting these changes, but in the future we will introduce more endpoints to properly organize these differing requests. TODO (xinhaoz) - Split this API into stmts and txns properly instead of using this param. | [reserved](#support-status) |
+| limit | [int64](#cockroach.server.serverpb.CombinedStatementsStatsRequest-int64) | | | [reserved](#support-status) |
+
+#### CombinedStatementsStatsRequest.FetchMode
+
+
+
+| Field | Type | Label | Description | Support status |
+| ----- | ---- | ----- | ----------- | -------------- |
+| stats_type | [CombinedStatementsStatsRequest.StatsType](#cockroach.server.serverpb.CombinedStatementsStatsRequest-cockroach.server.serverpb.CombinedStatementsStatsRequest.StatsType) | | | [reserved](#support-status) |
+| sort | [StatsSortOptions](#cockroach.server.serverpb.CombinedStatementsStatsRequest-cockroach.server.serverpb.StatsSortOptions) | | | [reserved](#support-status) |
+
+
+
+
+
#### Response Parameters
@@ -4191,6 +4209,8 @@ Support status: [reserved](#support-status)
| last_reset | [google.protobuf.Timestamp](#cockroach.server.serverpb.StatementsResponse-google.protobuf.Timestamp) | | Timestamp of the last stats reset. | [reserved](#support-status) |
| internal_app_name_prefix | [string](#cockroach.server.serverpb.StatementsResponse-string) | | If set and non-empty, indicates the prefix to application_name used for statements/queries issued internally by CockroachDB. | [reserved](#support-status) |
| transactions | [StatementsResponse.ExtendedCollectedTransactionStatistics](#cockroach.server.serverpb.StatementsResponse-cockroach.server.serverpb.StatementsResponse.ExtendedCollectedTransactionStatistics) | repeated | Transactions is transaction-level statistics for the collection of statements in this response. | [reserved](#support-status) |
+| stmts_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
+| txns_total_runtime_secs | [float](#cockroach.server.serverpb.StatementsResponse-float) | | | [reserved](#support-status) |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index 16227841d2d0..1745f9412a24 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -2060,6 +2060,7 @@ GO_TARGETS = [
"//pkg/ts/testmodel:testmodel",
"//pkg/ts/testmodel:testmodel_test",
"//pkg/ts/tspb:tspb",
+ "//pkg/ts/tsutil:tsutil",
"//pkg/ts:ts",
"//pkg/ts:ts_test",
"//pkg/ui:ui",
@@ -3194,6 +3195,7 @@ GET_X_DATA_TARGETS = [
"//pkg/ts/catalog:get_x_data",
"//pkg/ts/testmodel:get_x_data",
"//pkg/ts/tspb:get_x_data",
+ "//pkg/ts/tsutil:get_x_data",
"//pkg/ui:get_x_data",
"//pkg/upgrade:get_x_data",
"//pkg/upgrade/migrationstable:get_x_data",
diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go
index 9a195e735ebf..6c7bcff949f6 100644
--- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go
+++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go
@@ -343,11 +343,9 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
require.NoError(t, err)
request := &serverpb.StatementsRequest{}
- combinedStatsRequest := &serverpb.CombinedStatementsStatsRequest{}
var tenantStats *serverpb.StatementsResponse
- var tenantCombinedStats *serverpb.StatementsResponse
- // Populate `tenantStats` and `tenantCombinedStats`. The tenant server
+ // Populate `tenantStats`. The tenant server
// `Statements` and `CombinedStatements` methods are backed by the
// sqlinstance system which uses a cache populated through rangefeed
// for keeping track of SQL pod data. We use `SucceedsSoon` to eliminate
@@ -362,10 +360,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
return errors.New("tenant statements are unexpectedly empty")
}
- tenantCombinedStats, err = tenantStatusServer.CombinedStatementStats(ctx, combinedStatsRequest)
- if tenantCombinedStats == nil || len(tenantCombinedStats.Statements) == 0 {
- return errors.New("tenant combined statements are unexpectedly empty")
- }
return nil
})
@@ -374,11 +368,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
err = serverutils.GetJSONProto(nonTenant, path, &nonTenantStats)
require.NoError(t, err)
- path = "/_status/combinedstmts"
- var nonTenantCombinedStats serverpb.StatementsResponse
- err = serverutils.GetJSONProto(nonTenant, path, &nonTenantCombinedStats)
- require.NoError(t, err)
-
checkStatements := func(t *testing.T, tc []testCase, actual *serverpb.StatementsResponse) {
t.Helper()
var expectedStatements []string
@@ -414,13 +403,11 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
// First we verify that we have expected stats from tenants.
t.Run("tenant-stats", func(t *testing.T) {
checkStatements(t, testCaseTenant, tenantStats)
- checkStatements(t, testCaseTenant, tenantCombinedStats)
})
// Now we verify the non tenant stats are what we expected.
t.Run("non-tenant-stats", func(t *testing.T) {
checkStatements(t, testCaseNonTenant, &nonTenantStats)
- checkStatements(t, testCaseNonTenant, &nonTenantCombinedStats)
})
// Now we verify that tenant and non-tenant have no visibility into each other's stats.
@@ -437,17 +424,6 @@ func TestTenantCannotSeeNonTenantStats(t *testing.T) {
}
}
- for _, tenantStmt := range tenantCombinedStats.Statements {
- for _, nonTenantStmt := range nonTenantCombinedStats.Statements {
- require.NotEqual(t, tenantStmt, nonTenantStmt, "expected tenant to have no visibility to non-tenant's statement stats, but found:", nonTenantStmt)
- }
- }
-
- for _, tenantTxn := range tenantCombinedStats.Transactions {
- for _, nonTenantTxn := range nonTenantCombinedStats.Transactions {
- require.NotEqual(t, tenantTxn, nonTenantTxn, "expected tenant to have no visibility to non-tenant's transaction stats, but found:", nonTenantTxn)
- }
- }
})
}
@@ -463,43 +439,46 @@ func testResetSQLStatsRPCForTenant(
testCluster := testHelper.TestCluster()
controlCluster := testHelper.ControlCluster()
- // Disable automatic flush to ensure tests are deterministic.
+ // Set automatic flush to some long duration we'll never hit to
+ // ensure tests are deterministic.
testCluster.TenantConn(0 /* idx */).
- Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false")
+ Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")
controlCluster.TenantConn(0 /* idx */).
- Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = false")
+ Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")
defer func() {
// Cleanup
testCluster.TenantConn(0 /* idx */).
- Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true")
+ Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '10m'")
controlCluster.TenantConn(0 /* idx */).
- Exec(t, "SET CLUSTER SETTING sql.stats.flush.enabled = true")
+ Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '10m'")
}()
for _, flushed := range []bool{false, true} {
+ testTenant := testCluster.Tenant(serverccl.RandomServer)
+ testTenantConn := testTenant.GetTenantConn()
t.Run(fmt.Sprintf("flushed=%t", flushed), func(t *testing.T) {
// Clears the SQL Stats at the end of each test via builtin.
defer func() {
- testCluster.TenantConn(serverccl.RandomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()")
+ testTenantConn.Exec(t, "SELECT crdb_internal.reset_sql_stats()")
controlCluster.TenantConn(serverccl.RandomServer).Exec(t, "SELECT crdb_internal.reset_sql_stats()")
}()
for _, stmt := range stmts {
- testCluster.TenantConn(serverccl.RandomServer).Exec(t, stmt)
+ testTenantConn.Exec(t, stmt)
controlCluster.TenantConn(serverccl.RandomServer).Exec(t, stmt)
}
if flushed {
- testCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
+ testTenant.TenantSQLStats().Flush(ctx)
controlCluster.TenantSQLStats(serverccl.RandomServer).Flush(ctx)
}
- status := testCluster.TenantStatusSrv(serverccl.RandomServer)
+ status := testTenant.TenantStatusSrv()
statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
- Combined: true,
+ Combined: flushed,
})
require.NoError(t, err)
@@ -513,7 +492,7 @@ func testResetSQLStatsRPCForTenant(
require.NoError(t, err)
statsPostReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
- Combined: true,
+ Combined: flushed,
})
require.NoError(t, err)
@@ -538,7 +517,7 @@ func testResetSQLStatsRPCForTenant(
// Ensures that sql stats reset is isolated by tenant boundary.
statsFromControlCluster, err :=
controlCluster.TenantStatusSrv(serverccl.RandomServer).Statements(ctx, &serverpb.StatementsRequest{
- Combined: true,
+ Combined: flushed,
})
require.NoError(t, err)
diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel
index 12b99c0d1e0f..8ee3fbb49330 100644
--- a/pkg/cli/BUILD.bazel
+++ b/pkg/cli/BUILD.bazel
@@ -179,6 +179,7 @@ go_library(
"//pkg/testutils/serverutils",
"//pkg/ts",
"//pkg/ts/tspb",
+ "//pkg/ts/tsutil",
"//pkg/upgrade/upgrades",
"//pkg/util",
"//pkg/util/cgroups",
diff --git a/pkg/cli/decode_test.go b/pkg/cli/decode_test.go
index f3ddf9a3602a..b3475baaf481 100644
--- a/pkg/cli/decode_test.go
+++ b/pkg/cli/decode_test.go
@@ -124,7 +124,7 @@ func TestTryDecodeValue(t *testing.T) {
s: `\u0008\u0080ì¿ùÛ\u008bù\u0083\u0017\u0010\u0080¬¢ÿ¾ôù\u0083\u0017\u001a \n\u0018cr.node.sql.select.count\u0010\u0001\u0018\u0002 \u0002\u001a \n\u0018cr.node.sql.update.count\u0010\u0001\u0018\u0002 \u0002\u001a \n\u0018cr.node.sql.insert.count\u0010\u0001\u0018\u0002 \u0002\u001a \n\u0018cr.node.sql.delete.count\u0010\u0001\u0018\u0002 \u0002\u001a*\n\u001fcr.node.sql.service.latency-p99\u0010\u0003\u0018\u0002 \u0000*\u00011\u001a3\n+cr.node.sql.distsql.contended_queries.count\u0010\u0001\u0018\u0002 \u0002\u001a\u001c\n\u0011cr.store.replicas\u0010\u0001\u0018\u0002 \u0000*\u00011\u001a\u0019\n\u0011cr.store.capacity\u0010\u0001\u0018\u0002 \u0000\u001a#\n\u001bcr.store.capacity.available\u0010\u0001\u0018\u0002 \u0000\u001a\u001e\n\u0016cr.store.capacity.used\u0010\u0001\u0018\u0002 \u0000 \u0080Ø\u008eáo`,
wantOK: true,
protoType: "cockroach.ts.tspb.TimeSeriesQueryRequest",
- wantVal: `{"endNanos": "1659549679000000000", "queries": [{"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.select.count", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.update.count", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.insert.count", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.delete.count", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NONE", "downsampler": "MAX", "name": "cr.node.sql.service.latency-p99", "sourceAggregator": "SUM", "sources": ["1"]}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.distsql.contended_queries.count", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.replicas", "sourceAggregator": "SUM", "sources": ["1"]}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.capacity", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.capacity.available", "sourceAggregator": "SUM", "sources": []}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.capacity.used", "sourceAggregator": "SUM", "sources": []}], "sampleNanos": "30000000000", "startNanos": "1659546079000000000"}`,
+ wantVal: `{"endNanos": "1659549679000000000", "queries": [{"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.select.count", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.update.count", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.insert.count", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.delete.count", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NONE", "downsampler": "MAX", "name": "cr.node.sql.service.latency-p99", "sourceAggregator": "SUM", "sources": ["1"], "tenantId": {"id": "0"}}, {"derivative": "NON_NEGATIVE_DERIVATIVE", "downsampler": "AVG", "name": "cr.node.sql.distsql.contended_queries.count", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.replicas", "sourceAggregator": "SUM", "sources": ["1"], "tenantId": {"id": "0"}}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.capacity", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.capacity.available", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}, {"derivative": "NONE", "downsampler": "AVG", "name": "cr.store.capacity.used", "sourceAggregator": "SUM", "sources": [], "tenantId": {"id": "0"}}], "sampleNanos": "30000000000", "startNanos": "1659546079000000000"}`,
},
}
for _, tt := range tests {
diff --git a/pkg/cli/tsdump.go b/pkg/cli/tsdump.go
index e4db0d3ed184..9074e66673da 100644
--- a/pkg/cli/tsdump.go
+++ b/pkg/cli/tsdump.go
@@ -20,8 +20,8 @@ import (
"time"
"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
- "github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
+ "github.com/cockroachdb/cockroach/pkg/ts/tsutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
@@ -74,7 +74,7 @@ output.
// Buffer the writes to os.Stdout since we're going to
// be writing potentially a lot of data to it.
w := bufio.NewWriter(os.Stdout)
- if err := ts.DumpRawTo(stream, w); err != nil {
+ if err := tsutil.DumpRawTo(stream, w); err != nil {
return err
}
return w.Flush()
diff --git a/pkg/rpc/BUILD.bazel b/pkg/rpc/BUILD.bazel
index 8ff612d1b687..f964671d271a 100644
--- a/pkg/rpc/BUILD.bazel
+++ b/pkg/rpc/BUILD.bazel
@@ -37,6 +37,7 @@ go_library(
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
+ "//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/contextutil",
@@ -116,6 +117,7 @@ go_test(
"//pkg/spanconfig",
"//pkg/testutils",
"//pkg/testutils/skip",
+ "//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go
index 6f447503c4e7..81c623bf02cf 100644
--- a/pkg/rpc/auth_tenant.go
+++ b/pkg/rpc/auth_tenant.go
@@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
+ "github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
@@ -155,7 +156,7 @@ func (a tenantAuthorizer) authorize(
return a.capabilitiesAuthorizer.HasNodeStatusCapability(ctx, tenID)
case "/cockroach.ts.tspb.TimeSeries/Query":
- return a.capabilitiesAuthorizer.HasTSDBQueryCapability(ctx, tenID)
+ return a.authTSDBQuery(ctx, tenID, req.(*tspb.TimeSeriesQueryRequest))
default:
return authErrorf("unknown method %q", fullMethod)
@@ -379,6 +380,26 @@ func (a tenantAuthorizer) authSpanConfigConformance(
return nil
}
+// authTSDBQuery authorizes the provided tenant to invoke the TSDB Query RPC
+// with the provided args. A non-system tenant is only allowed to query its own
+// time series.
+func (a tenantAuthorizer) authTSDBQuery(
+ ctx context.Context, id roachpb.TenantID, request *tspb.TimeSeriesQueryRequest,
+) error {
+ for _, query := range request.Queries {
+ if !query.TenantID.IsSet() {
+ return authError("tsdb query with unspecified tenant not permitted")
+ }
+ if !query.TenantID.Equal(id) {
+ return authErrorf("tsdb query with invalid tenant not permitted")
+ }
+ }
+ if err := a.capabilitiesAuthorizer.HasTSDBQueryCapability(ctx, id); err != nil {
+ return authError(err.Error())
+ }
+ return nil
+}
+
// validateSpanConfigTarget validates that the tenant is authorized to interact
// with the supplied span config target. In particular, span targets must be
// wholly contained within the tenant keyspace and system span config targets
diff --git a/pkg/rpc/auth_test.go b/pkg/rpc/auth_test.go
index 3e82dbc4f1a7..1f10012759d1 100644
--- a/pkg/rpc/auth_test.go
+++ b/pkg/rpc/auth_test.go
@@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
+ "github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
@@ -300,6 +301,16 @@ func TestTenantAuthRequest(t *testing.T) {
}
}
+ makeTimeseriesQueryReq := func(tenantID roachpb.TenantID) *tspb.TimeSeriesQueryRequest {
+ return &tspb.TimeSeriesQueryRequest{
+ Queries: []tspb.Query{
+ {
+ TenantID: tenantID,
+ },
+ },
+ }
+ }
+
const noError = ""
for method, tests := range map[string][]struct {
req interface{}
@@ -817,6 +828,20 @@ func TestTenantAuthRequest(t *testing.T) {
expErr: `requested key span /Tenant/{10a-20b} not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
},
+ "/cockroach.ts.tspb.TimeSeries/Query": {
+ {
+ req: makeTimeseriesQueryReq(tenID),
+ expErr: noError,
+ },
+ {
+ req: makeTimeseriesQueryReq(roachpb.TenantID{}),
+ expErr: `tsdb query with unspecified tenant not permitted`,
+ },
+ {
+ req: makeTimeseriesQueryReq(roachpb.MustMakeTenantID(2)),
+ expErr: `tsdb query with invalid tenant not permitted`,
+ },
+ },
"/cockroach.rpc.Heartbeat/Ping": {
{req: &rpc.PingRequest{}, expErr: noError},
@@ -850,6 +875,16 @@ func TestTenantAuthRequest(t *testing.T) {
func TestTenantAuthCapabilityChecks(t *testing.T) {
defer leaktest.AfterTest(t)()
+ makeTimeseriesQueryReq := func(tenantID roachpb.TenantID) *tspb.TimeSeriesQueryRequest {
+ return &tspb.TimeSeriesQueryRequest{
+ Queries: []tspb.Query{
+ {
+ TenantID: tenantID,
+ },
+ },
+ }
+ }
+
tenID := roachpb.MustMakeTenantID(10)
for method, tests := range map[string][]struct {
req interface{}
@@ -876,6 +911,22 @@ func TestTenantAuthCapabilityChecks(t *testing.T) {
expErr: "tenant does not have capability",
},
},
+ "/cockroach.ts.tspb.TimeSeries/Query": {
+ {
+ req: makeTimeseriesQueryReq(tenID),
+ configureAuthorizer: func(authorizer *mockAuthorizer) {
+ authorizer.hasTSDBQueryCapability = true
+ },
+ expErr: "",
+ },
+ {
+ req: makeTimeseriesQueryReq(tenID),
+ configureAuthorizer: func(authorizer *mockAuthorizer) {
+ authorizer.hasTSDBQueryCapability = false
+ },
+ expErr: "tenant does not have capability",
+ },
+ },
} {
ctx := context.Background()
for _, tc := range tests {
diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index 1ea8f52ce306..1660236b0b00 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -505,6 +505,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/ts",
"//pkg/ts/tspb",
+ "//pkg/ts/tsutil",
"//pkg/ui",
"//pkg/upgrade",
"//pkg/upgrade/upgradebase",
diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go
index 1adae4c05502..18d65a9d633f 100644
--- a/pkg/server/combined_statement_stats.go
+++ b/pkg/server/combined_statement_stats.go
@@ -67,18 +67,36 @@ func getCombinedStatementStats(
settings *cluster.Settings,
testingKnobs *sqlstats.TestingKnobs,
) (*serverpb.StatementsResponse, error) {
- startTime := getTimeFromSeconds(req.Start)
- endTime := getTimeFromSeconds(req.End)
- limit := SQLStatsResponseMax.Get(&settings.SV)
showInternal := SQLStatsShowInternal.Get(&settings.SV)
whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs(
- startTime, endTime, limit, testingKnobs, showInternal)
- statements, err := collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit)
+ req, testingKnobs, showInternal, settings)
+
+ var statements []serverpb.StatementsResponse_CollectedStatementStatistics
+ var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics
+ var err error
+
+ if req.FetchMode == nil || req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
+ transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
+ if err != nil {
+ return nil, serverError(ctx, err)
+ }
+ }
+
+ if req.FetchMode != nil && req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
+ // Change the whereClause for the statements to those matching the txn_fingerprint_ids in the
+ // transactions response that are within the desired interval. We also don't need the order and
+ // limit anymore.
+ orderAndLimit = ""
+ whereClause, args = buildWhereClauseForStmtsByTxn(req, transactions, testingKnobs)
+ }
+
+ statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, testingKnobs)
if err != nil {
return nil, serverError(ctx, err)
}
- transactions, err := collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit)
+ stmtsRunTime, txnsRunTime, err := getTotalRuntimeSecs(ctx, req, ie, testingKnobs)
+
if err != nil {
return nil, serverError(ctx, err)
}
@@ -88,11 +106,188 @@ func getCombinedStatementStats(
Transactions: transactions,
LastReset: statsProvider.GetLastReset(),
InternalAppNamePrefix: catconstants.InternalAppNamePrefix,
+ StmtsTotalRuntimeSecs: stmtsRunTime,
+ TxnsTotalRuntimeSecs: txnsRunTime,
}
return response, nil
}
+func getTotalRuntimeSecs(
+ ctx context.Context,
+ req *serverpb.CombinedStatementsStatsRequest,
+ ie *sql.InternalExecutor,
+ testingKnobs *sqlstats.TestingKnobs,
+) (stmtsRuntime float32, txnsRuntime float32, err error) {
+ var buffer strings.Builder
+ buffer.WriteString(testingKnobs.GetAOSTClause())
+ var args []interface{}
+ startTime := getTimeFromSeconds(req.Start)
+ endTime := getTimeFromSeconds(req.End)
+
+ buffer.WriteString(" WHERE true")
+
+ if startTime != nil {
+ args = append(args, *startTime)
+ buffer.WriteString(fmt.Sprintf(" AND aggregated_ts >= $%d", len(args)))
+ }
+
+ if endTime != nil {
+ args = append(args, *endTime)
+ buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
+ }
+
+ whereClause := buffer.String()
+
+ queryWithPlaceholders := `
+SELECT
+COALESCE(
+ sum(
+ (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT *
+ (statistics-> 'statistics' ->> 'cnt')::FLOAT
+ )
+, 0)
+FROM crdb_internal.%s_statistics_persisted
+%s
+`
+
+ getRuntime := func(table string) (float32, error) {
+ it, err := ie.QueryIteratorEx(
+ ctx,
+ fmt.Sprintf(`%s-total-runtime`, table),
+ nil,
+ sessiondata.NodeUserSessionDataOverride,
+ fmt.Sprintf(queryWithPlaceholders, table, whereClause),
+ args...)
+
+ if err != nil {
+ return 0, err
+ }
+
+ defer func() {
+ closeErr := it.Close()
+ if closeErr != nil {
+ err = errors.CombineErrors(err, closeErr)
+ }
+ }()
+
+ ok, err := it.Next(ctx)
+ if err != nil {
+ return 0, err
+ }
+
+ if !ok {
+ return 0, errors.New("expected one row but got none")
+ }
+
+ var row tree.Datums
+ if row = it.Cur(); row == nil {
+ return 0, errors.New("unexpected null row")
+ }
+
+ return float32(tree.MustBeDFloat(row[0])), nil
+
+ }
+
+ if req.FetchMode == nil || req.FetchMode.StatsType != serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
+ stmtsRuntime, err = getRuntime("statement")
+ if err != nil {
+ return 0, 0, err
+ }
+ }
+
+ if req.FetchMode == nil || req.FetchMode.StatsType != serverpb.CombinedStatementsStatsRequest_StmtStatsOnly {
+ txnsRuntime, err = getRuntime("transaction")
+ if err != nil {
+ return 0, 0, err
+ }
+ }
+
+ return stmtsRuntime, txnsRuntime, err
+}
+
+// Common stmt and txn columns to sort on.
+const (
+ sortSvcLatDesc = `(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT DESC`
+ sortCPUTimeDesc = `(statistics -> 'statistics' -> 'execution_statistics' -> 'cpuSQLNanos' ->> 'mean')::FLOAT DESC`
+ sortExecCountDesc = `(statistics -> 'statistics' ->> 'cnt')::INT DESC`
+ sortContentionTimeDesc = `(statistics -> 'execution_statistics' -> 'contentionTime' ->> 'mean')::FLOAT DESC`
+ sortPCTRuntimeDesc = `((statistics -> 'statistics' -> 'svcLat' ->> 'mean')::FLOAT *
+ (statistics -> 'statistics' ->> 'cnt')::FLOAT) DESC`
+)
+
+func getStmtColumnFromSortOption(sort serverpb.StatsSortOptions) string {
+ switch sort {
+ case serverpb.StatsSortOptions_SERVICE_LAT:
+ return sortSvcLatDesc
+ case serverpb.StatsSortOptions_CPU_TIME:
+ return sortCPUTimeDesc
+ case serverpb.StatsSortOptions_EXECUTION_COUNT:
+ return sortExecCountDesc
+ case serverpb.StatsSortOptions_P99_STMTS_ONLY:
+ return `(statistics -> 'statistics' -> 'latencyInfo' ->> 'p99')::FLOAT DESC`
+ case serverpb.StatsSortOptions_CONTENTION_TIME:
+ return sortContentionTimeDesc
+ default:
+ return sortSvcLatDesc
+ }
+}
+
+func getTxnColumnFromSortOption(sort serverpb.StatsSortOptions) string {
+ switch sort {
+ case serverpb.StatsSortOptions_SERVICE_LAT:
+ return sortSvcLatDesc
+ case serverpb.StatsSortOptions_CPU_TIME:
+ return sortCPUTimeDesc
+ case serverpb.StatsSortOptions_EXECUTION_COUNT:
+ return sortExecCountDesc
+ case serverpb.StatsSortOptions_CONTENTION_TIME:
+ return sortContentionTimeDesc
+ case serverpb.StatsSortOptions_PCT_RUNTIME:
+ return sortPCTRuntimeDesc
+ default:
+ return sortSvcLatDesc
+ }
+}
+
+// buildWhereClauseForStmtsByTxn builds the where clause to get the statement
+// stats based on a list of transactions. The list of transactions provided must
+// contain no duplicate transaction fingerprint ids.
+func buildWhereClauseForStmtsByTxn(
+ req *serverpb.CombinedStatementsStatsRequest,
+ transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics,
+ testingKnobs *sqlstats.TestingKnobs,
+) (whereClause string, args []interface{}) {
+ var buffer strings.Builder
+ buffer.WriteString(testingKnobs.GetAOSTClause())
+
+ buffer.WriteString(" WHERE true")
+
+ // Add start and end filters from request.
+ startTime := getTimeFromSeconds(req.Start)
+ endTime := getTimeFromSeconds(req.End)
+ if startTime != nil {
+ args = append(args, *startTime)
+ buffer.WriteString(fmt.Sprintf(" AND aggregated_ts >= $%d", len(args)))
+ }
+
+ if endTime != nil {
+ args = append(args, *endTime)
+ buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
+ }
+
+ txnFingerprints := make([]string, 0, len(transactions))
+ for i := range transactions {
+ fingerprint := uint64(transactions[i].StatsData.TransactionFingerprintID)
+ txnFingerprints = append(txnFingerprints, fmt.Sprintf("\\x%016x", fingerprint))
+ }
+
+ args = append(args, txnFingerprints)
+ buffer.WriteString(fmt.Sprintf(" AND transaction_fingerprint_id = any $%d", len(args)))
+
+ return buffer.String(), args
+}
+
// getCombinedStatementsQueryClausesAndArgs returns:
// - where clause (filtering by name and aggregates_ts when defined)
// - order and limit clause
@@ -100,7 +295,10 @@ func getCombinedStatementStats(
// The whereClause will be in the format `WHERE A = $1 AND B = $2` and
// args will return the list of arguments in order that will replace the actual values.
func getCombinedStatementsQueryClausesAndArgs(
- start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs, showInternal bool,
+ req *serverpb.CombinedStatementsStatsRequest,
+ testingKnobs *sqlstats.TestingKnobs,
+ showInternal bool,
+ settings *cluster.Settings,
) (whereClause string, orderAndLimitClause string, args []interface{}) {
var buffer strings.Builder
buffer.WriteString(testingKnobs.GetAOSTClause())
@@ -115,17 +313,37 @@ func getCombinedStatementsQueryClausesAndArgs(
catconstants.DelegatedAppNamePrefix))
}
- if start != nil {
+ // Add start and end filters from request.
+ startTime := getTimeFromSeconds(req.Start)
+ endTime := getTimeFromSeconds(req.End)
+ if startTime != nil {
buffer.WriteString(" AND aggregated_ts >= $1")
- args = append(args, *start)
+ args = append(args, *startTime)
}
- if end != nil {
- args = append(args, *end)
+ if endTime != nil {
+ args = append(args, *endTime)
buffer.WriteString(fmt.Sprintf(" AND aggregated_ts <= $%d", len(args)))
}
+
+ // Add LIMIT from request.
+ limit := req.Limit
+ if limit == 0 {
+ limit = SQLStatsResponseMax.Get(&settings.SV)
+ }
args = append(args, limit)
- orderAndLimitClause = fmt.Sprintf(` ORDER BY aggregated_ts DESC LIMIT $%d`, len(args))
+
+ // Determine sort column.
+ var col string
+ if req.FetchMode == nil {
+ col = "fingerprint_id"
+ } else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_StmtStatsOnly {
+ col = getStmtColumnFromSortOption(req.FetchMode.Sort)
+ } else if req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly {
+ col = getTxnColumnFromSortOption(req.FetchMode.Sort)
+ }
+
+ orderAndLimitClause = fmt.Sprintf(` ORDER BY %s LIMIT $%d`, col, len(args))
return buffer.String(), orderAndLimitClause, args
}
@@ -136,23 +354,26 @@ func collectCombinedStatements(
whereClause string,
args []interface{},
orderAndLimit string,
+ testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {
-
- query := fmt.Sprintf(
- `SELECT
- fingerprint_id,
- transaction_fingerprint_id,
- app_name,
- max(aggregated_ts) as aggregated_ts,
- metadata,
- crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
- FROM crdb_internal.statement_statistics %s
- GROUP BY
- fingerprint_id,
- transaction_fingerprint_id,
- app_name,
- metadata
- %s`, whereClause, orderAndLimit)
+ aostClause := testingKnobs.GetAOSTClause()
+ query := fmt.Sprintf(`
+SELECT * FROM (
+SELECT
+ fingerprint_id,
+ transaction_fingerprint_id,
+ app_name,
+ max(aggregated_ts) as aggregated_ts,
+ metadata,
+ crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
+FROM crdb_internal.statement_statistics_persisted %s
+GROUP BY
+ fingerprint_id,
+ transaction_fingerprint_id,
+ app_name,
+ metadata
+) %s
+%s`, whereClause, aostClause, orderAndLimit)
const expectedNumDatums = 6
@@ -236,21 +457,25 @@ func collectCombinedTransactions(
whereClause string,
args []interface{},
orderAndLimit string,
+ testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) {
-
- query := fmt.Sprintf(
- `SELECT
- app_name,
- max(aggregated_ts) as aggregated_ts,
- fingerprint_id,
- metadata,
- crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
- FROM crdb_internal.transaction_statistics %s
- GROUP BY
- app_name,
- fingerprint_id,
- metadata
- %s`, whereClause, orderAndLimit)
+ aostClause := testingKnobs.GetAOSTClause()
+
+ query := fmt.Sprintf(`
+SELECT * FROM (
+SELECT
+ app_name,
+ max(aggregated_ts) as aggregated_ts,
+ fingerprint_id,
+ metadata,
+ crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics
+FROM crdb_internal.transaction_statistics_persisted %s
+GROUP BY
+ app_name,
+ fingerprint_id,
+ metadata
+) %s
+%s`, whereClause, aostClause, orderAndLimit)
const expectedNumDatums = 5
@@ -463,18 +688,15 @@ func getTotalStatementDetails(
query := fmt.Sprintf(
`SELECT
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
- aggregation_interval,
array_agg(app_name) as app_names,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
- max(sampled_plan) as sampled_plan,
encode(fingerprint_id, 'hex') as fingerprint_id
- FROM crdb_internal.statement_statistics %s
+ FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
- aggregation_interval,
fingerprint_id
LIMIT 1`, whereClause)
- const expectedNumDatums = 6
+ const expectedNumDatums = 4
var statement serverpb.StatementDetailsResponse_CollectedStatementSummary
row, err := ie.QueryRowEx(ctx, "combined-stmts-details-total", nil,
@@ -498,27 +720,18 @@ func getTotalStatementDetails(
return statement, serverError(ctx, err)
}
- aggInterval := tree.MustBeDInterval(row[1]).Duration
-
- apps := tree.MustBeDArray(row[2])
+ apps := tree.MustBeDArray(row[1])
var appNames []string
for _, s := range apps.Array {
appNames = util.CombineUniqueString(appNames, []string{string(tree.MustBeDString(s))})
}
aggregatedMetadata.AppNames = appNames
- statsJSON := tree.MustBeDJSON(row[3]).JSON
+ statsJSON := tree.MustBeDJSON(row[2]).JSON
if err = sqlstatsutil.DecodeStmtStatsStatisticsJSON(statsJSON, &statistics.Stats); err != nil {
return statement, serverError(ctx, err)
}
- planJSON := tree.MustBeDJSON(row[4]).JSON
- plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
- if err != nil {
- return statement, serverError(ctx, err)
- }
- statistics.Stats.SensitiveInfo.MostRecentPlanDescription = *plan
-
queryTree, err := parser.ParseOne(aggregatedMetadata.Query)
if err != nil {
return statement, serverError(ctx, err)
@@ -528,12 +741,11 @@ func getTotalStatementDetails(
cfg.LineWidth = tree.ConsoleLineWidth
aggregatedMetadata.FormattedQuery = cfg.Pretty(queryTree.AST)
- aggregatedMetadata.FingerprintID = string(tree.MustBeDString(row[5]))
+ aggregatedMetadata.FingerprintID = string(tree.MustBeDString(row[3]))
statement = serverpb.StatementDetailsResponse_CollectedStatementSummary{
- Metadata: aggregatedMetadata,
- AggregationInterval: time.Duration(aggInterval.Nanos()),
- Stats: statistics.Stats,
+ Metadata: aggregatedMetadata,
+ Stats: statistics.Stats,
}
return statement, nil
@@ -553,18 +765,15 @@ func getStatementDetailsPerAggregatedTs(
`SELECT
aggregated_ts,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
- crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
- max(sampled_plan) as sampled_plan,
- aggregation_interval
- FROM crdb_internal.statement_statistics %s
+ crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics
+ FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
- aggregated_ts,
- aggregation_interval
+ aggregated_ts
ORDER BY aggregated_ts ASC
LIMIT $%d`, whereClause, len(args)+1)
args = append(args, limit)
- const expectedNumDatums = 5
+ const expectedNumDatums = 3
it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp", nil,
sessiondata.NodeUserSessionDataOverride, query, args...)
@@ -606,20 +815,10 @@ func getStatementDetailsPerAggregatedTs(
return nil, serverError(ctx, err)
}
- planJSON := tree.MustBeDJSON(row[3]).JSON
- plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
- if err != nil {
- return nil, serverError(ctx, err)
- }
- metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan
-
- aggInterval := tree.MustBeDInterval(row[4]).Duration
-
stmt := serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs{
- AggregatedTs: aggregatedTs,
- AggregationInterval: time.Duration(aggInterval.Nanos()),
- Stats: metadata.Stats,
- Metadata: aggregatedMetadata,
+ AggregatedTs: aggregatedTs,
+ Stats: metadata.Stats,
+ Metadata: aggregatedMetadata,
}
statements = append(statements, stmt)
@@ -709,17 +908,15 @@ func getStatementDetailsPerPlanHash(
(statistics -> 'statistics' -> 'planGists'->>0) as plan_gist,
crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
- max(sampled_plan) as sampled_plan,
- aggregation_interval,
index_recommendations
- FROM crdb_internal.statement_statistics %s
+ FROM crdb_internal.statement_statistics_persisted %s
GROUP BY
plan_hash,
plan_gist,
- aggregation_interval,
index_recommendations
LIMIT $%d`, whereClause, len(args)+1)
- expectedNumDatums := 7
+
+ expectedNumDatums := 5
args = append(args, limit)
@@ -771,15 +968,7 @@ func getStatementDetailsPerPlanHash(
return nil, serverError(ctx, err)
}
- planJSON := tree.MustBeDJSON(row[4]).JSON
- plan, err := sqlstatsutil.JSONToExplainTreePlanNode(planJSON)
- if err != nil {
- return nil, serverError(ctx, err)
- }
- metadata.Stats.SensitiveInfo.MostRecentPlanDescription = *plan
- aggInterval := tree.MustBeDInterval(row[5]).Duration
-
- recommendations := tree.MustBeDArray(row[6])
+ recommendations := tree.MustBeDArray(row[4])
var idxRecommendations []string
for _, s := range recommendations.Array {
idxRecommendations = util.CombineUniqueString(idxRecommendations, []string{string(tree.MustBeDString(s))})
@@ -809,7 +998,6 @@ func getStatementDetailsPerPlanHash(
metadata.Stats.Indexes = indexes
stmt := serverpb.StatementDetailsResponse_CollectedStatementGroupedByPlanHash{
- AggregationInterval: time.Duration(aggInterval.Nanos()),
ExplainPlan: explainPlan,
PlanHash: planHash,
Stats: metadata.Stats,
diff --git a/pkg/server/server_import_ts_test.go b/pkg/server/server_import_ts_test.go
index 745c3c0f8111..8f561cedbe09 100644
--- a/pkg/server/server_import_ts_test.go
+++ b/pkg/server/server_import_ts_test.go
@@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
+ "github.com/cockroachdb/cockroach/pkg/ts/tsutil"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -83,7 +84,7 @@ func dumpTSNonempty(t *testing.T, cc *grpc.ClientConn, dest string) (bytes int64
f, err := os.Create(dest)
require.NoError(t, err)
- require.NoError(t, ts.DumpRawTo(c, f))
+ require.NoError(t, tsutil.DumpRawTo(c, f))
require.NoError(t, f.Close())
info, err := os.Stat(dest)
require.NoError(t, err)
diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto
index 1321927db770..34cbb8672518 100644
--- a/pkg/server/serverpb/status.proto
+++ b/pkg/server/serverpb/status.proto
@@ -1613,12 +1613,49 @@ message StatementsResponse {
// Transactions is transaction-level statistics for the collection of
// statements in this response.
repeated ExtendedCollectedTransactionStatistics transactions = 5 [(gogoproto.nullable) = false];
+
+ float stmts_total_runtime_secs = 6;
+
+ float txns_total_runtime_secs = 7;
}
+enum StatsSortOptions {
+ SERVICE_LAT = 0;
+ CPU_TIME = 1;
+ EXECUTION_COUNT = 2;
+ P99_STMTS_ONLY = 3;
+ CONTENTION_TIME = 4;
+ PCT_RUNTIME = 5;
+}
message CombinedStatementsStatsRequest {
+ enum StatsType {
+ StmtStatsOnly = 0;
+ TxnStatsOnly = 1;
+ }
+
+ message FetchMode {
+ StatsType stats_type = 1;
+ StatsSortOptions sort = 2;
+ }
+
// Unix time range for aggregated statements.
int64 start = 1 [(gogoproto.nullable) = true];
int64 end = 2 [(gogoproto.nullable) = true];
+
+ // Note that if fetch_mode is set to transactions only, we will also
+ // include the statement statistics for the stmts in the transactions
+ // response. This is more of a hack-y method to get the complete stats
+ // for txns, because in the client we need to fill in some txn stats info
+ // from its stmt stats, such as the query string.
+ //
+ // We prefer this hackier method right now to reduce surface area for backporting
+ // these changes, but in the future we will introduce more endpoints to properly
+ // organize these differing requests.
+ // TODO (xinhaoz) - Split this API into stmts and txns properly instead of using
+ // this param.
+ FetchMode fetch_mode = 5 [(gogoproto.nullable) = true];
+
+ int64 limit = 6;
}
// StatementDetailsRequest requests the details of a Statement, based on its keys.
@@ -2300,6 +2337,7 @@ service Status {
get: "/_status/combinedstmts"
};
}
+
rpc StatementDetails(StatementDetailsRequest) returns (StatementDetailsResponse) {
option (google.api.http) = {
get: "/_status/stmtdetails/{fingerprint_id}"
diff --git a/pkg/server/stats_test.go b/pkg/server/stats_test.go
index 9a95ff1b250d..0a078f67404a 100644
--- a/pkg/server/stats_test.go
+++ b/pkg/server/stats_test.go
@@ -325,7 +325,7 @@ func TestClusterResetSQLStats(t *testing.T) {
}
statsPreReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
- Combined: true,
+ Combined: flushed,
})
require.NoError(t, err)
@@ -339,7 +339,7 @@ func TestClusterResetSQLStats(t *testing.T) {
require.NoError(t, err)
statsPostReset, err := status.Statements(ctx, &serverpb.StatementsRequest{
- Combined: true,
+ Combined: flushed,
})
require.NoError(t, err)
diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel
index 124e6b65b3fd..18181a395219 100644
--- a/pkg/server/status/BUILD.bazel
+++ b/pkg/server/status/BUILD.bazel
@@ -52,6 +52,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/ts/tspb",
+ "//pkg/ts/tsutil",
"//pkg/util/cgroups",
"//pkg/util/envutil",
"//pkg/util/goschedstats",
diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go
index 756a93834f06..72cf79727044 100644
--- a/pkg/server/status/recorder.go
+++ b/pkg/server/status/recorder.go
@@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
+ "github.com/cockroachdb/cockroach/pkg/ts/tsutil"
"github.com/cockroachdb/cockroach/pkg/util/cgroups"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -331,22 +332,33 @@ func (mr *MetricsRecorder) GetTimeSeriesData() []tspb.TimeSeriesData {
lastDataCount := atomic.LoadInt64(&mr.lastDataCount)
data := make([]tspb.TimeSeriesData, 0, lastDataCount)
- // Record time series from node-level registries.
+ // Record time series from node-level registries for system tenant.
now := mr.clock.Now()
recorder := registryRecorder{
registry: mr.mu.nodeRegistry,
format: nodeTimeSeriesPrefix,
- source: strconv.FormatInt(int64(mr.mu.desc.NodeID), 10),
+ source: mr.mu.desc.NodeID.String(),
timestampNanos: now.UnixNano(),
}
recorder.record(&data)
+ // Record time series from node-level registries for secondary tenants.
+ for tenantID, r := range mr.mu.tenantRegistries {
+ tenantRecorder := registryRecorder{
+ registry: r,
+ format: nodeTimeSeriesPrefix,
+ source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()),
+ timestampNanos: now.UnixNano(),
+ }
+ tenantRecorder.record(&data)
+ }
+
// Record time series from store-level registries.
for storeID, r := range mr.mu.storeRegistries {
storeRecorder := registryRecorder{
registry: r,
format: storeTimeSeriesPrefix,
- source: strconv.FormatInt(int64(storeID), 10),
+ source: storeID.String(),
timestampNanos: now.UnixNano(),
}
storeRecorder.record(&data)
diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go
index 60975b2f3514..c9bbc7a77a71 100644
--- a/pkg/server/status/recorder_test.go
+++ b/pkg/server/status/recorder_test.go
@@ -136,6 +136,10 @@ func TestMetricsRecorderTenants(t *testing.T) {
)
recorderTenant.AddNode(regTenant, nodeDescTenant, 50, "foo:26257", "foo:26258", "foo:5432")
+ // ========================================
+ // Verify that the recorder exports metrics for tenants as text.
+ // ========================================
+
g := metric.NewGauge(metric.Metadata{Name: "some_metric"})
reg1.AddMetric(g)
g.Update(123)
@@ -178,6 +182,63 @@ func TestMetricsRecorderTenants(t *testing.T) {
require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`)
require.Contains(t, bufTenant.String(), `some_metric{tenant="application2"} 456`)
+ // ========================================
+ // Verify that the recorder processes tenant time series registries
+ // ========================================
+
+ expectedData := []tspb.TimeSeriesData{
+ // System tenant metrics
+ {
+ Name: "cr.node.node-id",
+ Source: "1",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: manual.Now().UnixNano(),
+ Value: float64(1),
+ },
+ },
+ },
+ {
+ Name: "cr.node.some_metric",
+ Source: "1",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: manual.Now().UnixNano(),
+ Value: float64(123),
+ },
+ },
+ },
+ // App tenant metrics
+ {
+ Name: "cr.node.node-id",
+ Source: "1-123",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: manual.Now().UnixNano(),
+ Value: float64(nodeDesc.NodeID),
+ },
+ },
+ },
+ {
+ Name: "cr.node.some_metric",
+ Source: "1-123",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: manual.Now().UnixNano(),
+ Value: float64(456),
+ },
+ },
+ },
+ }
+
+ actualData := recorder.GetTimeSeriesData()
+
+ // compare actual vs expected values
+ sort.Sort(byTimeAndName(actualData))
+ sort.Sort(byTimeAndName(expectedData))
+ if a, e := actualData, expectedData; !reflect.DeepEqual(a, e) {
+ t.Errorf("recorder did not yield expected time series collection; diff:\n %v", pretty.Diff(e, a))
+ }
}
// TestMetricsRecorder verifies that the metrics recorder properly formats the
diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go
index 4db0154df274..a3d52af4b6ac 100644
--- a/pkg/server/status_test.go
+++ b/pkg/server/status_test.go
@@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
@@ -1640,6 +1641,9 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
}
}
+ // Flush stats, as combinedstmts reads only from system.
+ thirdServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+
// Hit query endpoint.
var resp serverpb.StatementsResponse
if err := getStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil {
@@ -2012,6 +2016,8 @@ func TestStatusAPICombinedStatements(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
@@ -2050,6 +2056,8 @@ func TestStatusAPICombinedStatements(t *testing.T) {
thirdServerSQL.Exec(t, stmt.stmt)
}
+ testCluster.Server(2).SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+
var resp serverpb.StatementsResponse
// Test that non-admin without VIEWACTIVITY privileges cannot access.
err := getStatusJSONProtoWithAdminOption(firstServerProto, "combinedstmts", &resp, false)
@@ -2057,7 +2065,7 @@ func TestStatusAPICombinedStatements(t *testing.T) {
t.Fatalf("expected privilege error, got %v", err)
}
- testPath := func(path string, expectedStmts []string) {
+ verifyStmts := func(path string, expectedStmts []string, hasTxns bool, t *testing.T) {
// Hit query endpoint.
if err := getStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false); err != nil {
t.Fatal(err)
@@ -2065,6 +2073,7 @@ func TestStatusAPICombinedStatements(t *testing.T) {
// See if the statements returned are what we executed.
var statementsInResponse []string
+ expectedTxnFingerprints := map[appstatspb.TransactionFingerprintID]struct{}{}
for _, respStatement := range resp.Statements {
if respStatement.Key.KeyData.Failed {
// We ignore failed statements here as the INSERT statement can fail and
@@ -2081,14 +2090,24 @@ func TestStatusAPICombinedStatements(t *testing.T) {
}
statementsInResponse = append(statementsInResponse, respStatement.Key.KeyData.Query)
+ expectedTxnFingerprints[respStatement.Key.KeyData.TransactionFingerprintID] = struct{}{}
+ }
+
+ for _, respTxn := range resp.Transactions {
+ delete(expectedTxnFingerprints, respTxn.StatsData.TransactionFingerprintID)
}
sort.Strings(expectedStmts)
sort.Strings(statementsInResponse)
if !reflect.DeepEqual(expectedStmts, statementsInResponse) {
- t.Fatalf("expected queries\n\n%v\n\ngot queries\n\n%v\n%s",
- expectedStmts, statementsInResponse, pretty.Sprint(resp))
+ t.Fatalf("expected queries\n\n%v\n\ngot queries\n\n%v\n%s\n path: %s",
+ expectedStmts, statementsInResponse, pretty.Sprint(resp), path)
+ }
+ if hasTxns {
+ assert.Empty(t, expectedTxnFingerprints)
+ } else {
+ assert.Empty(t, resp.Transactions)
}
}
@@ -2101,33 +2120,65 @@ func TestStatusAPICombinedStatements(t *testing.T) {
expectedStatements = append(expectedStatements, expectedStmt)
}
- // Grant VIEWACTIVITY.
- thirdServerSQL.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITY", authenticatedUserNameNoAdmin().Normalized()))
+ oneMinAfterAggregatedTs := aggregatedTs + 60
- // Test with no query params.
- testPath("combinedstmts", expectedStatements)
+ t.Run("fetch_mode=combined, VIEWACTIVITY", func(t *testing.T) {
+ // Grant VIEWACTIVITY.
+ thirdServerSQL.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITY", authenticatedUserNameNoAdmin().Normalized()))
+
+ // Test with no query params.
+ verifyStmts("combinedstmts", expectedStatements, true, t)
+ // Test with end = 1 min after aggregatedTs; should give the same results as get all.
+ verifyStmts(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements, true, t)
+ // Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all.
+ verifyStmts(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs),
+ expectedStatements, true, t)
+ // Test with start = 1 min after aggregatedTs; should give no results
+ verifyStmts(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil, true, t)
+ })
- oneMinAfterAggregatedTs := aggregatedTs + 60
- // Test with end = 1 min after aggregatedTs; should give the same results as get all.
- testPath(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements)
- // Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all.
- testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs), expectedStatements)
- // Test with start = 1 min after aggregatedTs; should give no results
- testPath(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil)
+ t.Run("fetch_mode=combined, VIEWACTIVITYREDACTED", func(t *testing.T) {
+ // Remove VIEWACTIVITY so we can test with just the VIEWACTIVITYREDACTED role.
+ thirdServerSQL.Exec(t, fmt.Sprintf("ALTER USER %s NOVIEWACTIVITY", authenticatedUserNameNoAdmin().Normalized()))
+ // Grant VIEWACTIVITYREDACTED.
+ thirdServerSQL.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITYREDACTED", authenticatedUserNameNoAdmin().Normalized()))
+
+ // Test with no query params.
+ verifyStmts("combinedstmts", expectedStatements, true, t)
+ // Test with end = 1 min after aggregatedTs; should give the same results as get all.
+ verifyStmts(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements, true, t)
+ // Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all.
+ verifyStmts(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs), expectedStatements, true, t)
+ // Test with start = 1 min after aggregatedTs; should give no results
+ verifyStmts(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil, true, t)
+ })
- // Remove VIEWACTIVITY so we can test with just the VIEWACTIVITYREDACTED role.
- thirdServerSQL.Exec(t, fmt.Sprintf("ALTER USER %s NOVIEWACTIVITY", authenticatedUserNameNoAdmin().Normalized()))
- // Grant VIEWACTIVITYREDACTED.
- thirdServerSQL.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITYREDACTED", authenticatedUserNameNoAdmin().Normalized()))
+ t.Run("fetch_mode=StmtsOnly", func(t *testing.T) {
+ verifyStmts("combinedstmts?fetch_mode.stats_type=0", expectedStatements, false, t)
+ })
- // Test with no query params.
- testPath("combinedstmts", expectedStatements)
- // Test with end = 1 min after aggregatedTs; should give the same results as get all.
- testPath(fmt.Sprintf("combinedstmts?end=%d", oneMinAfterAggregatedTs), expectedStatements)
- // Test with start = 1 hour before aggregatedTs end = 1 min after aggregatedTs; should give same results as get all.
- testPath(fmt.Sprintf("combinedstmts?start=%d&end=%d", aggregatedTs-3600, oneMinAfterAggregatedTs), expectedStatements)
- // Test with start = 1 min after aggregatedTs; should give no results
- testPath(fmt.Sprintf("combinedstmts?start=%d", oneMinAfterAggregatedTs), nil)
+ t.Run("fetch_mode=TxnsOnly with limit", func(t *testing.T) {
+ // Verify that we only return stmts for the txns in the response.
+ // We'll add a limit in a later commit to help verify this behaviour.
+ if err := getStatusJSONProtoWithAdminOption(firstServerProto, "combinedstmts?fetch_mode.stats_type=1&limit=2",
+ &resp, false); err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t, 2, len(resp.Transactions))
+ stmtFingerprintIDs := map[appstatspb.StmtFingerprintID]struct{}{}
+ for _, txn := range resp.Transactions {
+ for _, stmtFingerprint := range txn.StatsData.StatementFingerprintIDs {
+ stmtFingerprintIDs[stmtFingerprint] = struct{}{}
+ }
+ }
+
+ for _, stmt := range resp.Statements {
+ if _, ok := stmtFingerprintIDs[stmt.ID]; !ok {
+ t.Fatalf("unexpected stmt; stmt unrelated to a txn int he response: %s", stmt.Key.KeyData.Query)
+ }
+ }
+ })
}
func TestStatusAPIStatementDetails(t *testing.T) {
@@ -2136,6 +2187,8 @@ func TestStatusAPIStatementDetails(t *testing.T) {
// The liveness session might expire before the stress race can finish.
skip.UnderStressRace(t, "expensive tests")
+ ctx := context.Background()
+
// Aug 30 2021 19:50:00 GMT+0000
aggregatedTs := int64(1630353000)
testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
@@ -2170,6 +2223,7 @@ func TestStatusAPIStatementDetails(t *testing.T) {
for _, stmt := range statements {
thirdServerSQL.Exec(t, stmt)
}
+
query := `INSERT INTO posts VALUES (_, '_')`
fingerprintID := appstatspb.ConstructStatementFingerprintID(query,
false, true, `roachblog`)
@@ -2193,6 +2247,9 @@ func TestStatusAPIStatementDetails(t *testing.T) {
}
testPath := func(path string, expected resultValues) {
+ // Need to flush since this EP reads only flushed data.
+ testCluster.Server(2).SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)
+
err := getStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false)
require.NoError(t, err)
require.Equal(t, int64(expected.totalCount), resp.Statement.Stats.Count)
diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go
index 4a64ed903035..22c6bfc40786 100644
--- a/pkg/server/tenant.go
+++ b/pkg/server/tenant.go
@@ -613,7 +613,9 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
// We can now add the node registry.
s.recorder.AddNode(
s.registry,
- roachpb.NodeDescriptor{},
+ roachpb.NodeDescriptor{
+ NodeID: s.rpcContext.NodeID.Get(),
+ },
timeutil.Now().UnixNano(),
s.sqlServer.cfg.AdvertiseAddr,
s.sqlServer.cfg.HTTPAdvertiseAddr,
@@ -1035,7 +1037,7 @@ func makeTenantSQLServerArgs(
return sqlServerArgs{}, err
}
- sTS := ts.MakeTenantServer(baseCfg.AmbientCtx, tenantConnect)
+ sTS := ts.MakeTenantServer(baseCfg.AmbientCtx, tenantConnect, rpcContext.TenantID)
systemConfigWatcher := systemconfigwatcher.NewWithAdditionalProvider(
keys.MakeSQLCodec(sqlCfg.TenantID), clock, rangeFeedFactory, &baseCfg.DefaultZoneConfig,
diff --git a/pkg/ts/BUILD.bazel b/pkg/ts/BUILD.bazel
index 8269d03e603d..2e965cc58bf1 100644
--- a/pkg/ts/BUILD.bazel
+++ b/pkg/ts/BUILD.bazel
@@ -16,7 +16,6 @@ go_library(
"rollup.go",
"server.go",
"timespan.go",
- "util.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ts",
visibility = ["//visibility:public"],
@@ -33,6 +32,7 @@ go_library(
"//pkg/storage",
"//pkg/ts/catalog",
"//pkg/ts/tspb",
+ "//pkg/ts/tsutil",
"//pkg/util/contextutil",
"//pkg/util/encoding",
"//pkg/util/hlc",
@@ -72,11 +72,13 @@ go_test(
embed = [":ts"],
deps = [
"//pkg/base",
+ "//pkg/ccl/kvccl/kvtenantccl",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
+ "//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
@@ -87,6 +89,7 @@ go_test(
"//pkg/testutils",
"//pkg/testutils/localtestcluster",
"//pkg/testutils/serverutils",
+ "//pkg/testutils/testcluster",
"//pkg/ts/testmodel",
"//pkg/ts/tspb",
"//pkg/util/hlc",
diff --git a/pkg/ts/main_test.go b/pkg/ts/main_test.go
index 1d0b2494a23d..e18fe8dca2af 100644
--- a/pkg/ts/main_test.go
+++ b/pkg/ts/main_test.go
@@ -18,11 +18,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)
func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
+ serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
diff --git a/pkg/ts/query.go b/pkg/ts/query.go
index f11550e419a4..2694e98fe488 100644
--- a/pkg/ts/query.go
+++ b/pkg/ts/query.go
@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
+ "github.com/cockroachdb/cockroach/pkg/ts/tsutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)
@@ -533,6 +534,9 @@ func (db *DB) Query(
// queryChunk processes a chunk of a query; this will read the necessary data
// from disk and apply the desired processing operations to generate a result.
+//
+// sourceSet is an output parameter that creates a set of all sources included
+// in the result.
func (db *DB) queryChunk(
ctx context.Context,
query tspb.Query,
@@ -552,9 +556,9 @@ func (db *DB) queryChunk(
var data []kv.KeyValue
var err error
if len(query.Sources) == 0 {
- data, err = db.readAllSourcesFromDatabase(ctx, query.Name, diskResolution, diskTimespan)
+ data, err = db.readAllSourcesFromDatabase(ctx, query.Name, diskResolution, diskTimespan, query.TenantID)
} else {
- data, err = db.readFromDatabase(ctx, query.Name, diskResolution, diskTimespan, query.Sources)
+ data, err = db.readFromDatabase(ctx, query.Name, diskResolution, diskTimespan, query.Sources, query.TenantID)
}
if err != nil {
@@ -592,8 +596,11 @@ func (db *DB) queryChunk(
}
// Add unique sources to the supplied source set.
+ // NB: we filter for only unique primary sources since we do not expose
+ // tenant sources on the API level.
for k := range sourceSpans {
- sourceSet[k] = struct{}{}
+ source, _ := tsutil.DecodeSource(k)
+ sourceSet[source] = struct{}{}
}
return nil
}
@@ -829,6 +836,7 @@ func (db *DB) readFromDatabase(
diskResolution Resolution,
timespan QueryTimespan,
sources []string,
+ tenantID roachpb.TenantID,
) ([]kv.KeyValue, error) {
// Iterate over all key timestamps which may contain data for the given
// sources, based on the given start/end time and the resolution.
@@ -837,8 +845,19 @@ func (db *DB) readFromDatabase(
kd := diskResolution.SlabDuration()
for currentTimestamp := startTimestamp; currentTimestamp <= timespan.EndNanos; currentTimestamp += kd {
for _, source := range sources {
- key := MakeDataKey(seriesName, source, diskResolution, currentTimestamp)
- b.Get(key)
+ // If a TenantID is specified and is not the system tenant, only query
+ // data for that tenant source.
+ if tenantID.IsSet() && !tenantID.IsSystem() {
+ source = tsutil.MakeTenantSource(source, tenantID.String())
+ key := MakeDataKey(seriesName, source, diskResolution, currentTimestamp)
+ b.Get(key)
+ } else {
+ // Otherwise, we scan all keys that prefix match the source, since the system tenant
+ // reads all tenant time series.
+ startKey := MakeDataKey(seriesName, source, diskResolution, currentTimestamp)
+ endKey := MakeDataKey(seriesName, source, diskResolution, currentTimestamp).PrefixEnd()
+ b.Scan(startKey, endKey)
+ }
}
}
if err := db.db.Run(ctx, b); err != nil {
@@ -846,11 +865,11 @@ func (db *DB) readFromDatabase(
}
var rows []kv.KeyValue
for _, result := range b.Results {
- row := result.Rows[0]
- if row.Value == nil {
+ if len(result.Rows) == 1 && result.Rows[0].Value == nil {
+ // This came from a Get that did not find the key.
continue
}
- rows = append(rows, row)
+ rows = append(rows, result.Rows...)
}
return rows, nil
}
@@ -860,7 +879,11 @@ func (db *DB) readFromDatabase(
// optional limit is used when memory usage is being limited by the number of
// keys, rather than by timespan.
func (db *DB) readAllSourcesFromDatabase(
- ctx context.Context, seriesName string, diskResolution Resolution, timespan QueryTimespan,
+ ctx context.Context,
+ seriesName string,
+ diskResolution Resolution,
+ timespan QueryTimespan,
+ tenantID roachpb.TenantID,
) ([]kv.KeyValue, error) {
// Based on the supplied timestamps and resolution, construct start and
// end keys for a scan that will return every key with data relevant to
@@ -878,7 +901,24 @@ func (db *DB) readAllSourcesFromDatabase(
if err := db.db.Run(ctx, b); err != nil {
return nil, err
}
- return b.Results[0].Rows, nil
+
+ if !tenantID.IsSet() || tenantID.IsSystem() {
+ return b.Results[0].Rows, nil
+ }
+
+ // Filter out rows that don't belong to the tenant source
+ var rows []kv.KeyValue
+ for _, row := range b.Results[0].Rows {
+ _, source, _, _, err := DecodeDataKey(row.Key)
+ if err != nil {
+ return nil, err
+ }
+ _, tenantSource := tsutil.DecodeSource(source)
+ if tenantSource == tenantID.String() {
+ rows = append(rows, row)
+ }
+ }
+ return rows, nil
}
// convertKeysToSpans converts a batch of KeyValues queried from disk into a
diff --git a/pkg/ts/server.go b/pkg/ts/server.go
index 81d3631bfe12..7da7a761bc19 100644
--- a/pkg/ts/server.go
+++ b/pkg/ts/server.go
@@ -102,9 +102,11 @@ type Server struct {
var _ tspb.TimeSeriesServer = &Server{}
type TenantServer struct {
+ // NB: TenantServer only implements Query from tspb.TimeSeriesServer
tspb.UnimplementedTimeSeriesServer
log.AmbientContext
+ tenantID roachpb.TenantID
tenantConnect kvtenant.Connector
}
@@ -118,6 +120,10 @@ func (t *TenantServer) Query(
ctx context.Context, req *tspb.TimeSeriesQueryRequest,
) (*tspb.TimeSeriesQueryResponse, error) {
ctx = t.AnnotateCtx(ctx)
+ // Currently, secondary tenants are only able to view their own metrics.
+ for i := range req.Queries {
+ req.Queries[i].TenantID = t.tenantID
+ }
return t.tenantConnect.Query(ctx, req)
}
@@ -134,10 +140,13 @@ func (s *TenantServer) RegisterGateway(
return tspb.RegisterTimeSeriesHandler(ctx, mux, conn)
}
-func MakeTenantServer(ambient log.AmbientContext, tenantConnect kvtenant.Connector) *TenantServer {
+func MakeTenantServer(
+ ambient log.AmbientContext, tenantConnect kvtenant.Connector, tenantID roachpb.TenantID,
+) *TenantServer {
return &TenantServer{
AmbientContext: ambient,
tenantConnect: tenantConnect,
+ tenantID: tenantID,
}
}
diff --git a/pkg/ts/server_test.go b/pkg/ts/server_test.go
index e7f066acb387..03531b176fd3 100644
--- a/pkg/ts/server_test.go
+++ b/pkg/ts/server_test.go
@@ -20,9 +20,11 @@ import (
"unsafe"
"github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+ "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
@@ -36,6 +38,9 @@ import (
"github.com/stretchr/testify/require"
)
+// Dummy import to pull in kvtenantccl. This allows us to start tenants.
+var _ = kvtenantccl.Connector{}
+
func TestServerQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
@@ -295,6 +300,232 @@ func TestServerQueryStarvation(t *testing.T) {
}
}
+func TestServerQueryTenant(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ testCluster := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{
+ DisableDefaultTestTenant: true,
+ Knobs: base.TestingKnobs{
+ Store: &kvserver.StoreTestingKnobs{
+ DisableTimeSeriesMaintenanceQueue: true,
+ },
+ },
+ },
+ })
+ defer testCluster.Stopper().Stop(context.Background())
+ tsrv := testCluster.Server(0).(*server.TestServer)
+ systemDB := serverutils.OpenDBConn(
+ t,
+ tsrv.ServingSQLAddr(),
+ "", /* useDatabase */
+ false, /* insecure */
+ tsrv.Stopper(),
+ )
+
+ // Populate data directly.
+ tsdb := tsrv.TsDB()
+ if err := tsdb.StoreData(context.Background(), ts.Resolution10s, []tspb.TimeSeriesData{
+ {
+ Name: "test.metric",
+ Source: "1",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 100.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 200.0,
+ },
+ },
+ },
+ {
+ Name: "test.metric",
+ Source: "1-10",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 1.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 2.0,
+ },
+ },
+ },
+ {
+ Name: "test.metric",
+ Source: "2",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 200.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 400.0,
+ },
+ },
+ },
+ {
+ Name: "test.metric",
+ Source: "2-10",
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 4.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 5.0,
+ },
+ },
+ },
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ // System tenant should aggregate across all tenants.
+ expectedSystemResult := &tspb.TimeSeriesQueryResponse{
+ Results: []tspb.TimeSeriesQueryResponse_Result{
+ {
+ Query: tspb.Query{
+ Name: "test.metric",
+ Sources: []string{"1"},
+ },
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 101.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 202.0,
+ },
+ },
+ },
+ {
+ Query: tspb.Query{
+ Name: "test.metric",
+ Sources: []string{"1", "2"},
+ },
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 305.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 607.0,
+ },
+ },
+ },
+ },
+ }
+
+ conn, err := tsrv.RPCContext().GRPCDialNode(tsrv.Cfg.Addr, tsrv.NodeID(),
+ rpc.DefaultClass).Connect(context.Background())
+ if err != nil {
+ t.Fatal(err)
+ }
+ client := tspb.NewTimeSeriesClient(conn)
+ systemResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{
+ StartNanos: 400 * 1e9,
+ EndNanos: 500 * 1e9,
+ Queries: []tspb.Query{
+ {
+ Name: "test.metric",
+ Sources: []string{"1"},
+ },
+ {
+ Name: "test.metric",
+ },
+ },
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, r := range systemResponse.Results {
+ sort.Strings(r.Sources)
+ }
+ require.Equal(t, expectedSystemResult, systemResponse)
+
+ // App tenant should only report metrics with its tenant ID in the secondary source field
+ tenantID := roachpb.MustMakeTenantID(10)
+ expectedTenantResponse := &tspb.TimeSeriesQueryResponse{
+ Results: []tspb.TimeSeriesQueryResponse_Result{
+ {
+ Query: tspb.Query{
+ Name: "test.metric",
+ Sources: []string{"1"},
+ TenantID: tenantID,
+ },
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 1.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 2.0,
+ },
+ },
+ },
+ {
+ Query: tspb.Query{
+ Name: "test.metric",
+ Sources: []string{"1", "2"},
+ TenantID: tenantID,
+ },
+ Datapoints: []tspb.TimeSeriesDatapoint{
+ {
+ TimestampNanos: 400 * 1e9,
+ Value: 5.0,
+ },
+ {
+ TimestampNanos: 500 * 1e9,
+ Value: 7.0,
+ },
+ },
+ },
+ },
+ }
+
+ tenant, _ := serverutils.StartTenant(t, testCluster.Server(0), base.TestTenantArgs{TenantID: tenantID})
+ _, err = systemDB.Exec("ALTER TENANT [10] GRANT CAPABILITY can_view_tsdb_metrics=true;\n")
+ if err != nil {
+ t.Fatal(err)
+ }
+ capability := map[tenantcapabilities.CapabilityID]string{tenantcapabilities.CanViewTSDBMetrics: "true"}
+ testCluster.WaitForTenantCapabilities(t, tenantID, capability)
+ tenantConn, err := tenant.(*server.TestTenant).RPCContext().GRPCDialNode(tenant.(*server.TestTenant).Cfg.AdvertiseAddr, tsrv.NodeID(), rpc.DefaultClass).Connect(context.Background())
+ if err != nil {
+ t.Fatal(err)
+ }
+ tenantClient := tspb.NewTimeSeriesClient(tenantConn)
+
+ tenantResponse, err := tenantClient.Query(context.Background(), &tspb.TimeSeriesQueryRequest{
+ StartNanos: 400 * 1e9,
+ EndNanos: 500 * 1e9,
+ Queries: []tspb.Query{
+ {
+ Name: "test.metric",
+ Sources: []string{"1"},
+ },
+ {
+ Name: "test.metric",
+ },
+ },
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, r := range tenantResponse.Results {
+ sort.Strings(r.Sources)
+ }
+ require.Equal(t, expectedTenantResponse, tenantResponse)
+}
+
// TestServerQueryMemoryManagement verifies that queries succeed under
// constrained memory requirements.
func TestServerQueryMemoryManagement(t *testing.T) {
diff --git a/pkg/ts/tspb/timeseries.proto b/pkg/ts/tspb/timeseries.proto
index aaea13d0b1da..ba3c3d274f5c 100644
--- a/pkg/ts/tspb/timeseries.proto
+++ b/pkg/ts/tspb/timeseries.proto
@@ -103,6 +103,9 @@ message Query {
// An optional list of sources to restrict the time series query. If no
// sources are provided, all available sources will be queried.
repeated string sources = 5;
+ // An optional tenant ID to restrict the time series query. If no tenant ID
+ // is provided, time series will be aggregated across all available tenants.
+ optional roachpb.TenantID tenant_id = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "TenantID"];
}
// TimeSeriesQueryRequest is the standard incoming time series query request
diff --git a/pkg/ts/tsutil/BUILD.bazel b/pkg/ts/tsutil/BUILD.bazel
new file mode 100644
index 000000000000..608649600d7d
--- /dev/null
+++ b/pkg/ts/tsutil/BUILD.bazel
@@ -0,0 +1,12 @@
+load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "tsutil",
+ srcs = ["util.go"],
+ importpath = "github.com/cockroachdb/cockroach/pkg/ts/tsutil",
+ visibility = ["//visibility:public"],
+ deps = ["//pkg/ts/tspb"],
+)
+
+get_x_data(name = "get_x_data")
diff --git a/pkg/ts/util.go b/pkg/ts/tsutil/util.go
similarity index 51%
rename from pkg/ts/util.go
rename to pkg/ts/tsutil/util.go
index 2221929ea398..146ad2949b2b 100644
--- a/pkg/ts/util.go
+++ b/pkg/ts/tsutil/util.go
@@ -8,11 +8,13 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
-package ts
+package tsutil
import (
"encoding/gob"
+ "fmt"
"io"
+ "strings"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
)
@@ -34,3 +36,25 @@ func DumpRawTo(src tspb.TimeSeries_DumpRawClient, out io.Writer) error {
}
}
}
+
+// MakeTenantSource creates a source given a NodeID and a TenantID
+func MakeTenantSource(nodeID string, tenantID string) string {
+ if tenantID != "" {
+ return fmt.Sprintf("%s-%s", nodeID, tenantID)
+ }
+ return nodeID
+}
+
+// DecodeSource splits a source into its individual components.
+//
+// primarySource can refer to NodeID or StoreID depending on the metric stored.
+// tenantSource refers to the TenantID of the secondary tenant (empty string for
+// system tenant for backwards compatibility).
+func DecodeSource(source string) (primarySource string, tenantSource string) {
+ splitSources := strings.Split(source, "-")
+ primarySource = splitSources[0]
+ if len(splitSources) > 1 {
+ tenantSource = splitSources[1]
+ }
+ return primarySource, tenantSource
+}
diff --git a/pkg/ui/workspaces/cluster-ui/src/api/statementsApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/statementsApi.ts
index 8a91304ef8f1..ac8a795c0be6 100644
--- a/pkg/ui/workspaces/cluster-ui/src/api/statementsApi.ts
+++ b/pkg/ui/workspaces/cluster-ui/src/api/statementsApi.ts
@@ -9,7 +9,7 @@
// licenses/APL.txt.
import { cockroach } from "@cockroachlabs/crdb-protobuf-client";
-import { fetchData } from "src/api";
+import { fetchData } from "src/api/fetchData";
import {
FixFingerprintHexValue,
HexStringToInt64String,
@@ -18,6 +18,8 @@ import {
stringToTimestamp,
} from "src/util";
import Long from "long";
+import moment from "moment";
+
import { AggregateStatistics } from "../statementsTable";
const STATEMENTS_PATH = "/_status/combinedstmts";
const STATEMENT_DETAILS_PATH = "/_status/stmtdetails";
@@ -32,24 +34,84 @@ export type StatementDetailsResponseWithKey = {
stmtResponse: StatementDetailsResponse;
key: string;
};
+
+export type SqlStatsResponse = cockroach.server.serverpb.StatementsResponse;
+export const SqlStatsSortOptions = cockroach.server.serverpb.StatsSortOptions;
+export type SqlStatsSortType = cockroach.server.serverpb.StatsSortOptions;
+
+const FetchStatsMode =
+ cockroach.server.serverpb.CombinedStatementsStatsRequest.StatsType;
+
export type ErrorWithKey = {
err: Error;
key: string;
};
+export const DEFAULT_STATS_REQ_OPTIONS = {
+ limit: 100,
+ sort: SqlStatsSortOptions.SERVICE_LAT,
+};
+
+// The required fields to create a stmts request.
+type StmtReqFields = {
+ limit: number;
+ sort: SqlStatsSortType;
+ start: moment.Moment;
+ end: moment.Moment;
+};
+
+export function createCombinedStmtsRequest({
+ limit,
+ sort,
+ start,
+ end,
+}: StmtReqFields): StatementsRequest {
+ return new cockroach.server.serverpb.CombinedStatementsStatsRequest({
+ start: Long.fromNumber(start.unix()),
+ end: Long.fromNumber(end.unix()),
+ limit: Long.fromNumber(limit ?? DEFAULT_STATS_REQ_OPTIONS.limit),
+ fetch_mode:
+ new cockroach.server.serverpb.CombinedStatementsStatsRequest.FetchMode({
+ sort: sort,
+ }),
+ });
+}
+
export const getCombinedStatements = (
req: StatementsRequest,
-): Promise => {
+): Promise => {
const queryStr = propsToQueryString({
start: req.start.toInt(),
end: req.end.toInt(),
+ "fetch_mode.stats_type": FetchStatsMode.StmtStatsOnly,
+ "fetch_mode.sort": req.fetch_mode?.sort,
+ limit: req.limit?.toInt() ?? DEFAULT_STATS_REQ_OPTIONS.limit,
});
return fetchData(
cockroach.server.serverpb.StatementsResponse,
`${STATEMENTS_PATH}?${queryStr}`,
null,
null,
- "30M",
+ "10M",
+ );
+};
+
+export const getFlushedTxnStatsApi = (
+ req: StatementsRequest,
+): Promise => {
+ const queryStr = propsToQueryString({
+ start: req.start?.toInt(),
+ end: req.end?.toInt(),
+ "fetch_mode.stats_type": FetchStatsMode.TxnStatsOnly,
+ "fetch_mode.sort": req.fetch_mode?.sort,
+ limit: req.limit?.toInt() ?? DEFAULT_STATS_REQ_OPTIONS.limit,
+ });
+ return fetchData(
+ cockroach.server.serverpb.StatementsResponse,
+ `${STATEMENTS_PATH}?${queryStr}`,
+ null,
+ null,
+ "10M",
);
};
diff --git a/pkg/ui/workspaces/cluster-ui/src/barCharts/barCharts.tsx b/pkg/ui/workspaces/cluster-ui/src/barCharts/barCharts.tsx
index 8254acd09dab..8af6e7fe1f46 100644
--- a/pkg/ui/workspaces/cluster-ui/src/barCharts/barCharts.tsx
+++ b/pkg/ui/workspaces/cluster-ui/src/barCharts/barCharts.tsx
@@ -149,10 +149,11 @@ export function workloadPctBarChart(
return barChartFactory(
"grey",
[
- bar(
- "pct-workload",
- (d: StatementStatistics) =>
- (d.stats.service_lat.mean * longToInt(d.stats.count)) / totalWorkload,
+ bar("pct-workload", (d: StatementStatistics) =>
+ totalWorkload !== 0
+ ? (d.stats.service_lat.mean * longToInt(d.stats.count)) /
+ totalWorkload
+ : 0,
),
],
v => Percentage(v, 1, 1),
diff --git a/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.module.scss b/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.module.scss
index 38a7574c7ea1..f249709fd869 100644
--- a/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.module.scss
+++ b/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.module.scss
@@ -6,7 +6,7 @@
}
&__btn {
- height: $line-height--large;
+ height: $line-height--larger;
width: 67px;
font-size: $font-size--small;
}
@@ -36,9 +36,10 @@
}
}
-.float {
+.btn-area {
float: left;
margin-right: 7px;
+ font-size: $font-size--medium;
}
.label {
diff --git a/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.tsx b/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.tsx
index a731fdc76de5..f7a631778c90 100644
--- a/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.tsx
+++ b/pkg/ui/workspaces/cluster-ui/src/columnsSelector/columnsSelector.tsx
@@ -36,6 +36,7 @@ export interface ColumnsSelectorProps {
// options provides the list of available columns and their initial selection state
options: SelectOption[];
onSubmitColumns: (selectedColumns: string[]) => void;
+ size?: "default" | "small";
}
export interface ColumnsSelectorState {
@@ -222,6 +223,7 @@ export default class ColumnsSelector extends React.Component<
render(): React.ReactElement {
const { hide } = this.state;
+ const { size = "default" } = this.props;
const dropdownArea = hide ? hidden : dropdown;
const options = this.getOptions();
const columnsSelected = options.filter(o => o.isSelected);
@@ -230,9 +232,9 @@ export default class ColumnsSelector extends React.Component<