Skip to content

Commit

Permalink
sql,server: exposes TxnIDResolution rpc endpoint
Browse files Browse the repository at this point in the history
This commit introduces the new TxnIDResolution RPC endpoint. This RPC
endpoint utilizes the new txnidcache.Reader interface to perform
transaction ID resolution.

Partially addresses cockroachdb#74487

Release note: None
  • Loading branch information
Azhng authored and RajivTS committed Mar 6, 2022
1 parent d700758 commit 16dfcd9
Show file tree
Hide file tree
Showing 21 changed files with 317 additions and 49 deletions.
58 changes: 58 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -4282,6 +4282,64 @@ UserSQLRolesResponse returns a list of roles for the logged SQL user.



## TxnIDResolution



TxnIDResolution is used by the contention event store to resolve
transaction ID into transaction fingerprint IDs.
This RPC does not have a corresponding HTTP endpoint on purpose, since
DB Console should never directly query this endpoint.

The API contract is the following:
- if the server can resolve the transaction IDs in the RPC request, it will
be returned in the RPC response.
- if the server is not able to resolve the transaction IDs, it will
instructs the transaction ID cache to drain its write buffer. (Since
transaction ID cache's write path is asynchronous, the transaction ID
requested by the client might not be available in the cache yet).
Client is responsible to perform retries if the requested transaction ID
is not returned in the RPC response.

Support status: [reserved](#support-status)

#### Request Parameters




Request object for issuing Transaction ID Resolution.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| coordinator_id | [string](#cockroach.server.serverpb.TxnIDResolutionRequest-string) | | coordinator_id is either the NodeID or SQLInstanceID depending on whether the transaction is executed on a system tenant or a regular tenant. | [reserved](#support-status) |
| txnIDs | [bytes](#cockroach.server.serverpb.TxnIDResolutionRequest-bytes) | repeated | | [reserved](#support-status) |







#### Response Parameters




Response object for issuing Transaction ID Resolution.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| resolvedTxnIDs | [cockroach.sql.contentionpb.ResolvedTxnID](#cockroach.server.serverpb.TxnIDResolutionResponse-cockroach.sql.contentionpb.ResolvedTxnID) | repeated | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/statusccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
62 changes: 62 additions & 0 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -82,6 +83,10 @@ func TestTenantStatusAPI(t *testing.T) {
t.Run("index_usage_stats", func(t *testing.T) {
testIndexUsageForTenants(t, testHelper)
})

t.Run("txn_id_resolution", func(t *testing.T) {
testTxnIDResolutionRPC(ctx, t, testHelper)
})
}

func TestTenantCannotSeeNonTenantStats(t *testing.T) {
Expand Down Expand Up @@ -887,3 +892,60 @@ func testTenantStatusCancelQuery(ctx context.Context, t *testing.T, helper *tena
require.Equal(t, false, cancelQueryResp.Canceled)
require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error)
}

// testTxnIDResolutionRPC tests the reachability of TxnIDResolution RPC. The
// underlying implementation correctness is tested within
// pkg/sql/contention/txnidcache.
func testTxnIDResolutionRPC(ctx context.Context, t *testing.T, helper *tenantTestHelper) {
run := func(sqlConn *sqlutils.SQLRunner, status serverpb.SQLStatusServer, coordinatorNodeID int32) {
sqlConn.Exec(t, "SET application_name='test1'")

sqlConn.Exec(t, "BEGIN")
result := sqlConn.QueryStr(t, `
SELECT
id
FROM
crdb_internal.node_transactions
WHERE
application_name = 'test1'`)
require.Equal(t, 1 /* expected */, len(result),
"expected only one active txn, but there are %d active txns found", len(result))
txnID := uuid.FromStringOrNil(result[0][0])
require.False(t, uuid.Nil.Equal(txnID),
"expected a valid txnID, but %+v is found", result)
sqlConn.Exec(t, "COMMIT")

testutils.SucceedsWithin(t, func() error {
resp, err := status.TxnIDResolution(ctx, &serverpb.TxnIDResolutionRequest{
CoordinatorID: strconv.Itoa(int(coordinatorNodeID)),
TxnIDs: []uuid.UUID{txnID},
})
require.NoError(t, err)
if len(resp.ResolvedTxnIDs) != 1 {
return errors.Newf("expected RPC response to have length of 1, but "+
"it is %d", len(resp.ResolvedTxnIDs))
}
require.Equal(t, txnID, resp.ResolvedTxnIDs[0].TxnID,
"expected to find txn %s on coordinator node %d, but it "+
"was not", txnID.String(), coordinatorNodeID)
require.NotEqual(t, roachpb.InvalidTransactionFingerprintID, resp.ResolvedTxnIDs[0].TxnFingerprintID)
return nil
}, 3*time.Second)
}

t.Run("regular_cluster", func(t *testing.T) {
status :=
helper.hostCluster.Server(0 /* idx */).StatusServer().(serverpb.SQLStatusServer)
sqlConn := helper.hostCluster.ServerConn(0 /* idx */)
run(sqlutils.MakeSQLRunner(sqlConn), status, 1 /* coordinatorNodeID */)
})

t.Run("tenant_cluster", func(t *testing.T) {
// Select a different tenant status server here so a pod-to-pod RPC will
// happen.
status :=
helper.testCluster().tenantStatusSrv(2 /* idx */)
sqlConn := helper.testCluster().tenantConn(0 /* idx */)
run(sqlConn, status, 1 /* coordinatorNodeID */)
})
}
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ go_library(
"//pkg/sql/catalog/systemschema",
"//pkg/sql/colexec",
"//pkg/sql/contention",
"//pkg/sql/contentionpb",
"//pkg/sql/descmetadata",
"//pkg/sql/distsql",
"//pkg/sql/execinfra",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SQLStatusServer interface {
ResetIndexUsageStats(context.Context, *ResetIndexUsageStatsRequest) (*ResetIndexUsageStatsResponse, error)
TableIndexStats(context.Context, *TableIndexStatsRequest) (*TableIndexStatsResponse, error)
UserSQLRoles(ctx context.Context, request *UserSQLRolesRequest) (*UserSQLRolesResponse, error)
TxnIDResolution(context.Context, *TxnIDResolutionRequest) (*TxnIDResolutionResponse, error)
}

// OptionalNodesStatusServer is a StatusServer that is only optionally present
Expand Down
34 changes: 34 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,24 @@ message UserSQLRolesResponse {
repeated string roles = 1;
}

// Request object for issuing Transaction ID Resolution.
message TxnIDResolutionRequest {
// coordinator_id is either the NodeID or SQLInstanceID depending on whether
// the transaction is executed on a system tenant or a regular tenant.
string coordinator_id = 1 [(gogoproto.customname) = "CoordinatorID"];

repeated bytes txnIDs = 2 [
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
(gogoproto.nullable) = false
];
}

// Response object for issuing Transaction ID Resolution.
message TxnIDResolutionResponse {
repeated cockroach.sql.contentionpb.ResolvedTxnID resolvedTxnIDs = 1 [
(gogoproto.nullable) = false];
}

service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
Expand Down Expand Up @@ -1881,4 +1899,20 @@ service Status {
get: "/_status/sqlroles"
};
}

// TxnIDResolution is used by the contention event store to resolve
// transaction ID into transaction fingerprint IDs.
// This RPC does not have a corresponding HTTP endpoint on purpose, since
// DB Console should never directly query this endpoint.
//
// The API contract is the following:
// - if the server can resolve the transaction IDs in the RPC request, it will
// be returned in the RPC response.
// - if the server is not able to resolve the transaction IDs, it will
// instructs the transaction ID cache to drain its write buffer. (Since
// transaction ID cache's write path is asynchronous, the transaction ID
// requested by the client might not be available in the cache yet).
// Client is responsible to perform retries if the requested transaction ID
// is not returned in the RPC response.
rpc TxnIDResolution(TxnIDResolutionRequest) returns (TxnIDResolutionResponse) {}
}
59 changes: 59 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
Expand All @@ -69,6 +70,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
raft "go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -366,6 +368,39 @@ func (b *baseStatusServer) ListLocalDistSQLFlows(
return response, nil
}

func (b *baseStatusServer) localTxnIDResolution(
req *serverpb.TxnIDResolutionRequest,
) *serverpb.TxnIDResolutionResponse {
txnIDCache := b.sqlServer.pgServer.SQLServer.GetTxnIDCache()

unresolvedTxnIDs := make(map[uuid.UUID]struct{}, len(req.TxnIDs))
for _, txnID := range req.TxnIDs {
unresolvedTxnIDs[txnID] = struct{}{}
}

resp := &serverpb.TxnIDResolutionResponse{
ResolvedTxnIDs: make([]contentionpb.ResolvedTxnID, 0, len(req.TxnIDs)),
}

for i := range req.TxnIDs {
if txnFingerprintID, found := txnIDCache.Lookup(req.TxnIDs[i]); found {
resp.ResolvedTxnIDs = append(resp.ResolvedTxnIDs, contentionpb.ResolvedTxnID{
TxnID: req.TxnIDs[i],
TxnFingerprintID: txnFingerprintID,
})
}
}

// If we encounter any transaction ID that we cannot resolve, we tell the
// txnID cache to drain its write buffer (note: The .DrainWriteBuffer() call
// is asynchronous). The client of this RPC will perform retries.
if len(unresolvedTxnIDs) > 0 {
txnIDCache.DrainWriteBuffer()
}

return resp
}

// A statusServer provides a RESTful status API.
type statusServer struct {
*baseStatusServer
Expand Down Expand Up @@ -3028,3 +3063,27 @@ func (s *statusServer) JobStatus(

return &serverpb.JobStatusResponse{Job: res}, nil
}

func (s *statusServer) TxnIDResolution(
ctx context.Context, req *serverpb.TxnIDResolutionRequest,
) (*serverpb.TxnIDResolutionResponse, error) {
ctx = s.AnnotateCtx(propagateGatewayMetadata(ctx))
if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil {
return nil, err
}

requestedNodeID, local, err := s.parseNodeID(req.CoordinatorID)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
if local {
return s.localTxnIDResolution(req), nil
}

statusClient, err := s.dialNode(ctx, requestedNodeID)
if err != nil {
return nil, err
}

return statusClient.TxnIDResolution(ctx, req)
}
28 changes: 28 additions & 0 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,3 +1042,31 @@ func (t *tenantStatusServer) NodesList(
}
return &resp, err
}

func (t *tenantStatusServer) TxnIDResolution(
ctx context.Context, req *serverpb.TxnIDResolutionRequest,
) (*serverpb.TxnIDResolutionResponse, error) {
ctx = t.AnnotateCtx(propagateGatewayMetadata(ctx))
if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil {
return nil, err
}

instanceID, local, err := t.parseInstanceID(req.CoordinatorID)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
if local {
return t.localTxnIDResolution(req), nil
}

instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}
statusClient, err := t.dialPod(ctx, instanceID, instance.InstanceAddr)
if err != nil {
return nil, err
}

return statusClient.TxnIDResolution(ctx, req)
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ go_library(
"//pkg/sql/colflow",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/contentionpb",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand Down Expand Up @@ -1952,7 +1952,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
// Transaction fingerprint ID will be available once transaction finishes
// execution.
ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: roachpb.InvalidTransactionFingerprintID,
})
Expand Down Expand Up @@ -2027,7 +2027,7 @@ func (ex *connExecutor) recordTransactionFinish(
}
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
TxnID: ev.txnID,
TxnFingerprintID: transactionFingerprintID,
})
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/contention/txnidcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/contention/contentionutils",
"//pkg/sql/contentionpb",
"//pkg/util/encoding",
"//pkg/util/metric",
"//pkg/util/stop",
Expand All @@ -42,6 +43,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/contentionpb",
"//pkg/sql/sessiondata",
"//pkg/sql/tests",
"//pkg/testutils",
Expand Down
Loading

0 comments on commit 16dfcd9

Please sign in to comment.