Skip to content

Commit

Permalink
sql,server: exposes TxnIDResolution rpc endpoint
Browse files Browse the repository at this point in the history
This commit introduces the new TxnIDResolution RPC endpoint. This RPC
endpoint utilizes the new txnidcache.Resolves interface to perform
transaction ID resolution.

Partially addresses cockroachdb#74487

Release note: None
  • Loading branch information
Azhng committed Feb 16, 2022
1 parent 13b80bb commit 4813ecb
Show file tree
Hide file tree
Showing 20 changed files with 282 additions and 32 deletions.
45 changes: 45 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -4278,6 +4278,51 @@ UserSQLRolesResponse returns a list of roles for the logged SQL user.



## TxnIDResolution





Support status: [reserved](#support-status)

#### Request Parameters




Request object for issuing Transaction ID Resolution.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| CoordinatorNodeID | [int32](#cockroach.server.serverpb.TxnIDResolutionRequest-int32) | | | [reserved](#support-status) |
| txnIDs | [bytes](#cockroach.server.serverpb.TxnIDResolutionRequest-bytes) | repeated | | [reserved](#support-status) |







#### Response Parameters




Response object for issuing Transaction ID Resolution.


| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| resolvedTxnIDs | [cockroach.sql.contentionpb.ResolvedTxnID](#cockroach.server.serverpb.TxnIDResolutionResponse-cockroach.sql.contentionpb.ResolvedTxnID) | repeated | | [reserved](#support-status) |







## RequestCA

`GET /_join/v1/ca`
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/statusccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -82,6 +83,10 @@ func TestTenantStatusAPI(t *testing.T) {
t.Run("index_usage_stats", func(t *testing.T) {
testIndexUsageForTenants(t, testHelper)
})

t.Run("txn_id_resolution", func(t *testing.T) {
testTxnIDResolutionRPC(ctx, t, testHelper)
})
}

func TestTenantCannotSeeNonTenantStats(t *testing.T) {
Expand Down Expand Up @@ -887,3 +892,53 @@ func testTenantStatusCancelQuery(ctx context.Context, t *testing.T, helper *tena
require.Equal(t, false, cancelQueryResp.Canceled)
require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error)
}

// testTxnIDResolutionRPC tests the reachability of TxnIDResolution RPC. The
// underlying implementation correctness is tested within
// pkg/sql/contention/txnidcache.
func testTxnIDResolutionRPC(ctx context.Context, t *testing.T, helper *tenantTestHelper) {
run := func(sqlConn *sqlutils.SQLRunner, status serverpb.SQLStatusServer, coordinatorNodeID int32) {
sqlConn.Exec(t, "SET application_name='test1'")

sqlConn.Exec(t, "BEGIN")
result := sqlConn.QueryStr(t, `
SELECT id
FROM crdb_internal.node_transactions
WHERE application_name = 'test1'`)
require.Equal(t, 1 /* expected */, len(result),
"expected only one active txn, but there are %d active txns found", len(result))
txnID := uuid.FromStringOrNil(result[0][0])
require.False(t, uuid.Nil.Equal(txnID),
"expected a valid txnID, but %+v is found", result)
sqlConn.Exec(t, "COMMIT")

testutils.SucceedsWithin(t, func() error {
resp, err := status.TxnIDResolution(ctx, &serverpb.TxnIDResolutionRequest{
CoordinatorNodeID: coordinatorNodeID,
TxnIDs: []uuid.UUID{txnID},
})
require.NoError(t, err)
if len(resp.ResolvedTxnIDs) != 1 {
return errors.New("txnID not found")
}
require.True(t, resp.ResolvedTxnIDs[0].TxnID.Equal(txnID))
return nil
}, 3*time.Second)
}

t.Run("regular_cluster", func(t *testing.T) {
status :=
helper.hostCluster.Server(0 /* idx */).StatusServer().(serverpb.SQLStatusServer)
sqlConn := helper.hostCluster.ServerConn(0 /* idx */)
run(sqlutils.MakeSQLRunner(sqlConn), status, 1 /* coordinatorNodeID */)
})

t.Run("tenant_cluster", func(t *testing.T) {
// Select a different tenant status server here so a pod-to-pod RPC will
// happen.
status :=
helper.testCluster().tenantStatusSrv(2 /* idx */)
sqlConn := helper.testCluster().tenantConn(0 /* idx */)
run(sqlConn, status, 1 /* coordinatorNodeID */)
})
}
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ go_library(
"//pkg/sql/catalog/systemschema",
"//pkg/sql/colexec",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/contentionpb",
"//pkg/sql/descmetadata",
"//pkg/sql/distsql",
"//pkg/sql/execinfra",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SQLStatusServer interface {
ResetIndexUsageStats(context.Context, *ResetIndexUsageStatsRequest) (*ResetIndexUsageStatsResponse, error)
TableIndexStats(context.Context, *TableIndexStatsRequest) (*TableIndexStatsResponse, error)
UserSQLRoles(ctx context.Context, request *UserSQLRolesRequest) (*UserSQLRolesResponse, error)
TxnIDResolution(context.Context, *TxnIDResolutionRequest) (*TxnIDResolutionResponse, error)
}

// OptionalNodesStatusServer is a StatusServer that is only optionally present
Expand Down
20 changes: 20 additions & 0 deletions pkg/server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,24 @@ message UserSQLRolesResponse {
repeated string roles = 1;
}

// Request object for issuing Transaction ID Resolution.
message TxnIDResolutionRequest {
int32 CoordinatorNodeID = 1 [
(gogoproto.customname) = "CoordinatorNodeID"
];

repeated bytes txnIDs = 2 [
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID",
(gogoproto.nullable) = false
];
}

// Response object for issuing Transaction ID Resolution.
message TxnIDResolutionResponse {
repeated cockroach.sql.contentionpb.ResolvedTxnID resolvedTxnIDs = 1 [
(gogoproto.nullable) = false];
}

service Status {
// Certificates retrieves a copy of the TLS certificates.
rpc Certificates(CertificatesRequest) returns (CertificatesResponse) {
Expand Down Expand Up @@ -1875,4 +1893,6 @@ service Status {
get: "/_status/sqlroles"
};
}

rpc TxnIDResolution(TxnIDResolutionRequest) returns (TxnIDResolutionResponse) {}
}
72 changes: 72 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
Expand All @@ -69,6 +71,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
raft "go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -366,6 +369,53 @@ func (b *baseStatusServer) ListLocalDistSQLFlows(
return response, nil
}

func (b *baseStatusServer) localTxnIDResolution(
req *serverpb.TxnIDResolutionRequest,
) *serverpb.TxnIDResolutionResponse {
txnIDCache := b.sqlServer.pgServer.SQLServer.GetTxnIDCache()

unresolvedTxnIDs := make(map[uuid.UUID]struct{}, len(req.TxnIDs))
for _, txnID := range req.TxnIDs {
unresolvedTxnIDs[txnID] = struct{}{}
}

resp := &serverpb.TxnIDResolutionResponse{
ResolvedTxnIDs: make([]contentionpb.ResolvedTxnID, 0, len(req.TxnIDs)),
}

// Optimistically attempt to resolve txn IDs. However, if the optimistic
// attempt fails, we flush the txnID cache and try again.
unresolvedTxnIDs, resp.ResolvedTxnIDs =
resolveTxnIDHelper(txnIDCache, unresolvedTxnIDs, resp.ResolvedTxnIDs)

if len(unresolvedTxnIDs) > 0 {
txnIDCache.Flush()

// If there are any unresolved txn IDs at this point, we can simply
// discard them. The caller will perform retry accordingly based
// on the response we send back.
_, resp.ResolvedTxnIDs =
resolveTxnIDHelper(txnIDCache, unresolvedTxnIDs, resp.ResolvedTxnIDs)
}

return resp
}

func resolveTxnIDHelper(
txnIDCache *txnidcache.Cache, query map[uuid.UUID]struct{}, result []contentionpb.ResolvedTxnID,
) (remaining map[uuid.UUID]struct{}, _ []contentionpb.ResolvedTxnID) {
for txnID := range query {
if txnFingerprintID, found := txnIDCache.Lookup(txnID); found {
delete(query, txnID)
result = append(result, contentionpb.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: txnFingerprintID,
})
}
}
return query, result
}

// A statusServer provides a RESTful status API.
type statusServer struct {
*baseStatusServer
Expand Down Expand Up @@ -3027,3 +3077,25 @@ func (s *statusServer) JobStatus(

return &serverpb.JobStatusResponse{Job: res}, nil
}

func (s *statusServer) TxnIDResolution(
ctx context.Context, req *serverpb.TxnIDResolutionRequest,
) (*serverpb.TxnIDResolutionResponse, error) {
ctx = s.AnnotateCtx(propagateGatewayMetadata(ctx))
if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil {
return nil, err
}

nodeID := roachpb.NodeID(req.CoordinatorNodeID)
local := nodeID == s.gossip.NodeID.Get()
if local {
return s.localTxnIDResolution(req), nil
}

statusClient, err := s.dialNode(ctx, nodeID)
if err != nil {
return nil, err
}

return statusClient.TxnIDResolution(ctx, req)
}
26 changes: 26 additions & 0 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,3 +1042,29 @@ func (t *tenantStatusServer) NodesList(
}
return &resp, err
}

func (t *tenantStatusServer) TxnIDResolution(
ctx context.Context, req *serverpb.TxnIDResolutionRequest,
) (*serverpb.TxnIDResolutionResponse, error) {
ctx = t.AnnotateCtx(propagateGatewayMetadata(ctx))
if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil {
return nil, err
}

instanceID := base.SQLInstanceID(req.CoordinatorNodeID)
local := instanceID == t.sqlServer.SQLInstanceID()
if local {
return t.localTxnIDResolution(req), nil
}

instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID)
if err != nil {
return nil, err
}
statusClient, err := t.dialPod(ctx, instanceID, instance.InstanceAddr)
if err != nil {
return nil, err
}

return statusClient.TxnIDResolution(ctx, req)
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ go_library(
"//pkg/sql/colflow",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/contentionpb",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand Down Expand Up @@ -1958,7 +1958,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
// Transaction fingerprint ID will be available once transaction finishes
// execution.
ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: roachpb.InvalidTransactionFingerprintID,
})
Expand Down Expand Up @@ -2033,7 +2033,7 @@ func (ex *connExecutor) recordTransactionFinish(
}
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{
TxnID: ev.txnID,
TxnFingerprintID: transactionFingerprintID,
})
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/contention/txnidcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/contention/contentionutils",
"//pkg/sql/contentionpb",
"//pkg/util/encoding",
"//pkg/util/metric",
"//pkg/util/stop",
Expand All @@ -42,6 +43,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/contentionpb",
"//pkg/sql/sessiondata",
"//pkg/sql/tests",
"//pkg/testutils",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/contention/txnidcache/concurrent_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
)

// blockSize is chosen as 168 since each ResolvedTxnID is 24 byte.
// 168 * 24 = 4032 bytes < 4KiB page size.
const blockSize = 168

type block [blockSize]ResolvedTxnID
type block [blockSize]contentionpb.ResolvedTxnID

var blockPool = &sync.Pool{
New: func() interface{} {
Expand Down Expand Up @@ -69,7 +70,7 @@ func newConcurrentWriteBuffer(sink blockSink) *concurrentWriteBuffer {

// Record records a mapping from txnID to its corresponding transaction
// fingerprint ID. Record is safe to be used concurrently.
func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) {
func (c *concurrentWriteBuffer) Record(resolvedTxnID contentionpb.ResolvedTxnID) {
c.guard.AtomicWrite(func(writerIdx int64) {
c.guard.block[writerIdx] = resolvedTxnID
})
Expand Down
Loading

0 comments on commit 4813ecb

Please sign in to comment.