Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
70959: serverccl: support ListContentionEvents RPC endpoint for Serverless r=maryliag,yuzefovich a=Azhng

Previously, ListContentionEvents RPC endpoint only returns node-local
contention events.
This commit introduces cluster RPC fanout for ListContentionEvents endpoint.
This commit also updated the ContentionRegistry to handle tenantID.

Resolves cockroachdb#68632

Release note (api change): ListContentionEvent RPC now returns cluster-wide
contention events.

Co-authored-by: Azhng <[email protected]>
  • Loading branch information
craig[bot] and Azhng committed Oct 1, 2021
2 parents aa7b70b + b50f1e0 commit 1057ea6
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 8 deletions.
16 changes: 12 additions & 4 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,9 @@ Response object for ListContentionEvents and ListLocalContentionEvents.
#### ListActivityError

An error wrapper object for ListContentionEventsResponse and
ListDistSQLFlowsResponse.
ListDistSQLFlowsResponse. Similar to the Statements endpoint, when
implemented on a tenant, the `node_id` field refers to the instanceIDs that
identify individual tenant pods.

| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
Expand Down Expand Up @@ -1651,7 +1653,9 @@ Response object for ListContentionEvents and ListLocalContentionEvents.
#### ListActivityError

An error wrapper object for ListContentionEventsResponse and
ListDistSQLFlowsResponse.
ListDistSQLFlowsResponse. Similar to the Statements endpoint, when
implemented on a tenant, the `node_id` field refers to the instanceIDs that
identify individual tenant pods.

| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
Expand Down Expand Up @@ -1743,7 +1747,9 @@ Info contains an information about a single DistSQL remote flow.
#### ListActivityError

An error wrapper object for ListContentionEventsResponse and
ListDistSQLFlowsResponse.
ListDistSQLFlowsResponse. Similar to the Statements endpoint, when
implemented on a tenant, the `node_id` field refers to the instanceIDs that
identify individual tenant pods.

| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
Expand Down Expand Up @@ -1834,7 +1840,9 @@ Info contains an information about a single DistSQL remote flow.
#### ListActivityError

An error wrapper object for ListContentionEventsResponse and
ListDistSQLFlowsResponse.
ListDistSQLFlowsResponse. Similar to the Statements endpoint, when
implemented on a tenant, the `node_id` field refers to the instanceIDs that
identify individual tenant pods.

| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/catalog/catconstants",
"//pkg/sql/catalog/descpb",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/sqlstats",
"//pkg/sql/tests",
Expand Down
76 changes: 76 additions & 0 deletions pkg/ccl/serverccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"net/url"
"sort"
"strconv"
"strings"
"testing"

Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -307,3 +309,77 @@ func ensureExpectedStmtFingerprintExistsInRPCResponse(
"%s tenant cluster, but it was not found", fingerprint, clusterType)
}
}

func TestContentionEventsForTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "expensive tests")

ctx := context.Background()

testHelper := newTestTenantHelper(t, 3 /* tenantClusterSize */)
defer testHelper.cleanup(ctx, t)

testingCluster := testHelper.testCluster()
controlledCluster := testHelper.controlCluster()

sqlutils.CreateTable(
t,
testingCluster[0].tenantConn,
"test",
"x INT PRIMARY KEY",
1, /* numRows */
sqlutils.ToRowFn(sqlutils.RowIdxFn),
)

testTableID, err :=
strconv.Atoi(testingCluster.tenantConn(0).QueryStr(t, "SELECT 'test.test'::regclass::oid")[0][0])
require.NoError(t, err)

testingCluster.tenantConn(0).Exec(t, "USE test")
testingCluster.tenantConn(1).Exec(t, "USE test")

testingCluster.tenantConn(0).Exec(t, `
BEGIN;
UPDATE test SET x = 100 WHERE x = 1;
`)
testingCluster.tenantConn(1).Exec(t, `
SET TRACING=on;
BEGIN PRIORITY HIGH;
UPDATE test SET x = 1000 WHERE x = 1;
COMMIT;
SET TRACING=off;
`)
testingCluster.tenantConn(0).ExpectErr(
t,
"^pq: restart transaction.+",
"COMMIT;",
)

resp, err :=
testingCluster.tenantStatusSrv(2).ListContentionEvents(ctx, &serverpb.ListContentionEventsRequest{})
require.NoError(t, err)

require.GreaterOrEqualf(t, len(resp.Events.IndexContentionEvents), 1,
"expecting at least 1 contention event, but found none")

found := false
for _, event := range resp.Events.IndexContentionEvents {
if event.TableID == descpb.ID(testTableID) && event.IndexID == descpb.IndexID(1) {
found = true
break
}
}

require.True(t, found,
"expect to find contention event for table %d, but found %+v", testTableID, resp)

resp, err = controlledCluster.tenantStatusSrv(0).ListContentionEvents(ctx, &serverpb.ListContentionEventsRequest{})
require.NoError(t, err)
for _, event := range resp.Events.IndexContentionEvents {
if event.TableID == descpb.ID(testTableID) && event.IndexID == descpb.IndexID(1) {
t.Errorf("did not expect contention event in controlled cluster, but it was found")
}
}
}
6 changes: 6 additions & 0 deletions pkg/rpc/auth_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func (a tenantAuthorizer) authorize(
case "/cockroach.server.serverpb.Status/ResetSQLStats":
return a.authTenant(tenID)

case "/cockroach.server.serverpb.Status/ListContentionEvents":
return a.authTenant(tenID)

case "/cockroach.server.serverpb.Status/ListLocalContentionEvents":
return a.authTenant(tenID)

case "/cockroach.server.serverpb.Status/ListSessions":
return a.authTenant(tenID)

Expand Down
4 changes: 3 additions & 1 deletion pkg/server/serverpb/status.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,9 @@ message CancelSessionResponse {
message ListContentionEventsRequest {}

// An error wrapper object for ListContentionEventsResponse and
// ListDistSQLFlowsResponse.
// ListDistSQLFlowsResponse. Similar to the Statements endpoint, when
// implemented on a tenant, the `node_id` field refers to the instanceIDs that
// identify individual tenant pods.
message ListActivityError {
// ID of node that was being contacted when this error occurred.
int32 node_id = 1 [
Expand Down
76 changes: 76 additions & 0 deletions pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2662,3 +2662,79 @@ func TestLicenseExpiryMetricNoLicense(t *testing.T) {
})
}
}

func TestStatusAPIContentionEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

params, _ := tests.CreateTestServerParams()
ctx := context.Background()
testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: params,
})

defer testCluster.Stopper().Stop(ctx)

server1Conn := sqlutils.MakeSQLRunner(testCluster.ServerConn(0))
server2Conn := sqlutils.MakeSQLRunner(testCluster.ServerConn(1))

sqlutils.CreateTable(
t,
testCluster.ServerConn(0),
"test",
"x INT PRIMARY KEY",
1, /* numRows */
sqlutils.ToRowFn(sqlutils.RowIdxFn),
)

testTableID, err :=
strconv.Atoi(server1Conn.QueryStr(t, "SELECT 'test.test'::regclass::oid")[0][0])
require.NoError(t, err)

server1Conn.Exec(t, "USE test")
server2Conn.Exec(t, "USE test")

server1Conn.Exec(t, `
SET TRACING=on;
BEGIN;
UPDATE test SET x = 100 WHERE x = 1;
`)
server2Conn.Exec(t, `
SET TRACING=on;
BEGIN PRIORITY HIGH;
UPDATE test SET x = 1000 WHERE x = 1;
COMMIT;
SET TRACING=off;
`)
server1Conn.ExpectErr(
t,
"^pq: restart transaction.+",
`
COMMIT;
SET TRACING=off;
`,
)

var resp serverpb.ListContentionEventsResponse
require.NoError(t,
getStatusJSONProtoWithAdminOption(
testCluster.Server(2),
"contention_events",
&resp,
true /* isAdmin */),
)

require.GreaterOrEqualf(t, len(resp.Events.IndexContentionEvents), 1,
"expecting at least 1 contention event, but found none")

found := false
for _, event := range resp.Events.IndexContentionEvents {
if event.TableID == descpb.ID(testTableID) && event.IndexID == descpb.IndexID(1) {
found = true
break
}
}

require.True(t, found,
"expect to find contention event for table %d, but found %+v", testTableID, resp)
}
50 changes: 48 additions & 2 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,55 @@ func (t *tenantStatusServer) CancelSession(
}

func (t *tenantStatusServer) ListContentionEvents(
ctx context.Context, request *serverpb.ListContentionEventsRequest,
ctx context.Context, req *serverpb.ListContentionEventsRequest,
) (*serverpb.ListContentionEventsResponse, error) {
return t.ListLocalContentionEvents(ctx, request)
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

// Check permissions early to avoid fan-out to all nodes.
if err := t.hasViewActivityPermissions(ctx); err != nil {
return nil, err
}

var response serverpb.ListContentionEventsResponse

podFn := func(ctx context.Context, client interface{}, _ base.SQLInstanceID) (interface{}, error) {
statusClient := client.(serverpb.StatusClient)
resp, err := statusClient.ListLocalContentionEvents(ctx, req)
if err != nil {
return nil, err
}
if len(resp.Errors) > 0 {
return nil, errors.Errorf("%s", resp.Errors[0].Message)
}
return resp, nil
}
responseFn := func(_ base.SQLInstanceID, nodeResp interface{}) {
if nodeResp == nil {
return
}
events := nodeResp.(*serverpb.ListContentionEventsResponse).Events
response.Events = contention.MergeSerializedRegistries(response.Events, events)
}
errorFn := func(instanceID base.SQLInstanceID, err error) {
errResponse := serverpb.ListActivityError{
NodeID: roachpb.NodeID(instanceID),
Message: err.Error(),
}
response.Errors = append(response.Errors, errResponse)
}

if err := t.iteratePods(
ctx,
"contention events list",
t.dialCallback,
podFn,
responseFn,
errorFn,
); err != nil {
return nil, err
}
return &response, nil
}

func (t *tenantStatusServer) ResetSQLStats(
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/contention/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func NewRegistry() *Registry {
func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) {
r.globalLock.Lock()
defer r.globalLock.Unlock()
// Remove the tenant ID prefix if there is any.
c.Key, _, _ = keys.DecodeTenantPrefix(c.Key)
_, rawTableID, rawIndexID, err := keys.DecodeTableIDIndexID(c.Key)
if err != nil {
// The key is not a valid SQL key, so we store it in a separate cache.
Expand Down

0 comments on commit 1057ea6

Please sign in to comment.