Skip to content

Commit

Permalink
serverccl: support IndexUsageStatistics RPC fanout for tenant status …
Browse files Browse the repository at this point in the history
…server

Resolves cockroachdb#70878

Release note (api change): Serverless's IndexUsageStatistics RPC now
returns cluster-wide data.
  • Loading branch information
Azhng committed Oct 1, 2021
1 parent 1057ea6 commit d82fb32
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 64 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/catalog/catconstants",
"//pkg/sql/catalog/descpb",
"//pkg/sql/idxusage",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/sqlstats",
"//pkg/sql/tests",
Expand Down
83 changes: 81 additions & 2 deletions pkg/ccl/serverccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -208,7 +210,7 @@ func TestResetSQLStatsRPCForTenant(t *testing.T) {
"SELECT 1, 1, 1",
}

testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */)
testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, tests.CreateTestingKnobs())
defer testHelper.cleanup(ctx, t)

testCluster := testHelper.testCluster()
Expand Down Expand Up @@ -318,7 +320,8 @@ func TestContentionEventsForTenant(t *testing.T) {

ctx := context.Background()

testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */)
testHelper :=
newTestTenantHelper(t, 3 /* tenantClusterSize */, tests.CreateTestingKnobs())
defer testHelper.cleanup(ctx, t)

testingCluster := testHelper.testCluster()
Expand Down Expand Up @@ -383,3 +386,79 @@ SET TRACING=off;
}
}
}

func TestIndexUsageForTenants(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "expensive tests")

ctx := context.Background()

statsIngestionCb, statsIngestionNotifier := idxusage.CreateIndexStatsIngestedCallbackForTest()

knobs := tests.CreateTestingKnobs()
knobs.IndexUsageStatsKnobs = &idxusage.TestingKnobs{
OnIndexUsageStatsProcessedCallback: statsIngestionCb,
}
testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */, knobs)
defer testHelper.cleanup(ctx, t)

testingCluster := testHelper.testCluster()
// controlledCluster := testHelper.controlCluster()

testingCluster.tenantConn(0).Exec(t, `
CREATE TABLE test (
k INT PRIMARY KEY,
a INT,
b INT,
INDEX(a)
)
`)

testingCluster.tenantConn(0).Exec(t, `
INSERT INTO test
VALUES (1, 10, 100), (2, 20, 200), (3, 30, 300)
`)

// Record scan on primary index.
testingCluster.tenantConn(0).Exec(t, "SELECT * FROM test")

// Record scan on secondary index.
testingCluster.tenantConn(1).Exec(t, "SELECT * FROM test@test_a_idx")
testTableIDStr := testingCluster.tenantConn(2).QueryStr(t, "SELECT 'test'::regclass::oid")[0][0]
testTableID, err := strconv.Atoi(testTableIDStr)
require.NoError(t, err)

// Wait for the stats to be ingested.
require.NoError(t,
idxusage.WaitForIndexStatsIngestionForTest(statsIngestionNotifier, map[roachpb.IndexUsageKey]struct{}{
{
TableID: roachpb.TableID(testTableID),
IndexID: 1,
}: {},
{
TableID: roachpb.TableID(testTableID),
IndexID: 2,
}: {},
}, 2 /* expectedEventCnt*/, 5*time.Second /* timeout */),
)

actual := testingCluster.tenantConn(2).QueryStr(t, `
SELECT
table_id,
index_id,
total_reads,
extract_duration('second', now() - last_read) < 5
FROM
crdb_internal.index_usage_statistics
WHERE
table_id = $1
`, testTableID)
expected := [][]string{
{testTableIDStr, "1", "1", "true"},
{testTableIDStr, "2", "1", "true"},
}

require.Equal(t, expected, actual)
}
22 changes: 18 additions & 4 deletions pkg/ccl/serverccl/tenant_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ type testTenant struct {
}

func newTestTenant(
t *testing.T, server serverutils.TestServerInterface, existing bool, tenantID roachpb.TenantID,
t *testing.T,
server serverutils.TestServerInterface,
existing bool,
tenantID roachpb.TenantID,
knobs base.TestingKnobs,
) *testTenant {
t.Helper()

tenantParams := tests.CreateTestTenantParams(tenantID)
tenantParams.Existing = existing
tenantParams.TestingKnobs = knobs

log.TestingClearServerIdentifiers()
tenant, tenantConn := serverutils.StartTenant(t, server, tenantParams)
Expand Down Expand Up @@ -71,7 +76,9 @@ type tenantTestHelper struct {
tenantControlCluster tenantCluster
}

func newTestTenantHelper(t *testing.T, tenantClusterSize int) *tenantTestHelper {
func newTestTenantHelper(
t *testing.T, tenantClusterSize int, knobs base.TestingKnobs,
) *tenantTestHelper {
t.Helper()

params, _ := tests.CreateTestServerParams()
Expand All @@ -87,12 +94,14 @@ func newTestTenantHelper(t *testing.T, tenantClusterSize int) *tenantTestHelper
server,
tenantClusterSize,
security.EmbeddedTenantIDs()[0],
knobs,
),
tenantControlCluster: newTenantCluster(
t,
server,
tenantClusterSize,
security.EmbeddedTenantIDs()[1],
knobs,
),
}
}
Expand All @@ -115,14 +124,19 @@ func (h *tenantTestHelper) cleanup(ctx context.Context, t *testing.T) {
type tenantCluster []*testTenant

func newTenantCluster(
t *testing.T, server serverutils.TestServerInterface, tenantClusterSize int, tenantID uint64,
t *testing.T,
server serverutils.TestServerInterface,
tenantClusterSize int,
tenantID uint64,
knobs base.TestingKnobs,
) tenantCluster {
t.Helper()

cluster := make([]*testTenant, tenantClusterSize)
existing := false
for i := 0; i < tenantClusterSize; i++ {
cluster[i] = newTestTenant(t, server, existing, roachpb.MakeTenantID(tenantID))
cluster[i] =
newTestTenant(t, server, existing, roachpb.MakeTenantID(tenantID), knobs)
existing = true
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/rpc/auth_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (a tenantAuthorizer) authorize(
case "/cockroach.server.serverpb.Status/ListLocalSessions":
return a.authTenant(tenID)

case "/cockroach.server.serverpb.Status/IndexUsageStatistics":
return a.authTenant(tenID)

case "/cockroach.roachpb.Internal/GetSpanConfigs":
return a.authGetSpanConfigs(tenID, req.(*roachpb.GetSpanConfigsRequest))

Expand Down
52 changes: 7 additions & 45 deletions pkg/server/index_usage_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,51 +54,11 @@ func compareStatsHelper(
require.Equal(t, expected, actual)
}

func createIndexStatsIngestedCallback() (
func(key roachpb.IndexUsageKey),
chan roachpb.IndexUsageKey,
) {
// Create a buffered channel so the callback is non-blocking.
notify := make(chan roachpb.IndexUsageKey, 100)

cb := func(key roachpb.IndexUsageKey) {
notify <- key
}

return cb, notify
}

func waitForStatsIngestion(
t *testing.T,
notify chan roachpb.IndexUsageKey,
expectedKeys map[roachpb.IndexUsageKey]struct{},
expectedEventCnt int,
timeout time.Duration,
) {
var timer timeutil.Timer
eventCnt := 0

timer.Reset(timeout)

for eventCnt < expectedEventCnt {
select {
case key := <-notify:
if _, ok := expectedKeys[key]; ok {
eventCnt++
}
continue
case <-timer.C:
timer.Read = true
t.Fatalf("expected stats ingestion to complete within %s, but it timed out", timeout)
}
}
}

func TestStatusAPIIndexUsage(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

statsIngestionCb, statsIngestionNotifier := createIndexStatsIngestedCallback()
statsIngestionCb, statsIngestionNotifier := idxusage.CreateIndexStatsIngestedCallbackForTest()

testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Expand Down Expand Up @@ -214,10 +174,12 @@ func TestStatusAPIIndexUsage(t *testing.T) {
thirdLocalStatsReader := thirdServer.SQLServer().(*sql.Server).GetLocalIndexStatistics()

// Wait for the stats to be ingested.
waitForStatsIngestion(t, statsIngestionNotifier, map[roachpb.IndexUsageKey]struct{}{
indexKeyA: {},
indexKeyB: {},
}, /* expectedKeys */ 4 /* expectedEventCnt*/, 5*time.Second /* timeout */)
require.NoError(t,
idxusage.WaitForIndexStatsIngestionForTest(statsIngestionNotifier, map[roachpb.IndexUsageKey]struct{}{
indexKeyA: {},
indexKeyB: {},
}, /* expectedKeys */ 4 /* expectedEventCnt*/, 5*time.Second /* timeout */),
)

// First node should have nothing.
stats := firstLocalStatsReader.Get(indexKeyA.TableID, indexKeyA.IndexID)
Expand Down
62 changes: 59 additions & 3 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,68 @@ func (t *tenantStatusServer) Profile(
}

func (t *tenantStatusServer) IndexUsageStatistics(
ctx context.Context, request *serverpb.IndexUsageStatisticsRequest,
ctx context.Context, req *serverpb.IndexUsageStatisticsRequest,
) (*serverpb.IndexUsageStatisticsResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

if _, err := t.privilegeChecker.requireViewActivityPermission(ctx); err != nil {
return nil, err
}

idxUsageStats := t.sqlServer.pgServer.SQLServer.GetLocalIndexStatistics()
return indexUsageStatsLocal(idxUsageStats)
localReq := &serverpb.IndexUsageStatisticsRequest{
NodeID: "local",
}

if len(req.NodeID) > 0 {
parsedInstanceID, local, err := t.parseInstanceID(req.NodeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if local {
statsReader := t.sqlServer.pgServer.SQLServer.GetLocalIndexStatistics()
return indexUsageStatsLocal(statsReader)
}

instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, parsedInstanceID)
if err != nil {
return nil, err
}
statusClient, err := t.dialPod(ctx, parsedInstanceID, instance.InstanceAddr)
if err != nil {
return nil, err
}

// We issue a localReq instead of the incoming req to other nodes. This is
// to instruct other nodes to only return us their node-local stats and
// do not further propagates the RPC call.
return statusClient.IndexUsageStatistics(ctx, localReq)
}

fetchIndexUsageStats := func(ctx context.Context, client interface{}, _ base.SQLInstanceID) (interface{}, error) {
statusClient := client.(serverpb.StatusClient)
return statusClient.IndexUsageStatistics(ctx, localReq)
}

resp := &serverpb.IndexUsageStatisticsResponse{}
aggFn := func(_ base.SQLInstanceID, nodeResp interface{}) {
stats := nodeResp.(*serverpb.IndexUsageStatisticsResponse)
resp.Statistics = append(resp.Statistics, stats.Statistics...)
}

var combinedError error
errFn := func(_ base.SQLInstanceID, nodeFnError error) {
combinedError = errors.CombineErrors(combinedError, nodeFnError)
}

if err := t.iteratePods(ctx, fmt.Sprintf("requesting index usage stats for instance %s", req.NodeID),
t.dialCallback,
fetchIndexUsageStats,
aggFn,
errFn,
); err != nil {
return nil, err
}

return resp, nil
}
Loading

0 comments on commit d82fb32

Please sign in to comment.