diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 9fdfb19932da..86ac40ed69e3 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -4162,6 +4162,51 @@ UserSQLRolesResponse returns a list of roles for the logged SQL user. +## TxnIDResolution + + + + + +Support status: [reserved](#support-status) + +#### Request Parameters + + + + +Request object for issuing Transaction ID Resolution. + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| CoordinatorNodeID | [int32](#cockroach.server.serverpb.TxnIDResolutionRequest-int32) | | | [reserved](#support-status) | +| txnIDs | [bytes](#cockroach.server.serverpb.TxnIDResolutionRequest-bytes) | repeated | | [reserved](#support-status) | + + + + + + + +#### Response Parameters + + + + +Response object for issuing Transaction ID Resolution. + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| resolvedTxnIDs | [cockroach.sql.contentionpb.ResolvedTxnID](#cockroach.server.serverpb.TxnIDResolutionResponse-cockroach.sql.contentionpb.ResolvedTxnID) | repeated | | [reserved](#support-status) | + + + + + + + ## RequestCA `GET /_join/v1/ca` diff --git a/pkg/ccl/serverccl/statusccl/BUILD.bazel b/pkg/ccl/serverccl/statusccl/BUILD.bazel index adafbeec17c6..5d4be2a0ae45 100644 --- a/pkg/ccl/serverccl/statusccl/BUILD.bazel +++ b/pkg/ccl/serverccl/statusccl/BUILD.bazel @@ -54,6 +54,7 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/timeutil", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index ef0a87c06a17..292cfa8124ef 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -82,6 +83,10 @@ func TestTenantStatusAPI(t *testing.T) { t.Run("index_usage_stats", func(t *testing.T) { testIndexUsageForTenants(t, testHelper) }) + + t.Run("txn_id_resolution", func(t *testing.T) { + testTxnIDResolutionRPC(ctx, t, testHelper) + }) } func TestTenantCannotSeeNonTenantStats(t *testing.T) { @@ -845,3 +850,53 @@ func testTenantStatusCancelQuery(ctx context.Context, t *testing.T, helper *tena require.Equal(t, false, cancelQueryResp.Canceled) require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error) } + +// testTxnIDResolutionRPC tests the reachability of TxnIDResolution RPC. The +// underlying implementation correctness is tested within +// pkg/sql/contention/txnidcache. +func testTxnIDResolutionRPC(ctx context.Context, t *testing.T, helper *tenantTestHelper) { + run := func(sqlConn *sqlutils.SQLRunner, status serverpb.SQLStatusServer, coordinatorNodeID int32) { + sqlConn.Exec(t, "SET application_name='test1'") + + sqlConn.Exec(t, "BEGIN") + result := sqlConn.QueryStr(t, ` +SELECT id +FROM crdb_internal.node_transactions +WHERE application_name = 'test1'`) + require.Equal(t, 1 /* expected */, len(result), + "expected only one active txn, but there are %d active txns found", len(result)) + txnID := uuid.FromStringOrNil(result[0][0]) + require.False(t, uuid.Nil.Equal(txnID), + "expected a valid txnID, but %+v is found", result) + sqlConn.Exec(t, "COMMIT") + + testutils.SucceedsWithin(t, func() error { + resp, err := status.TxnIDResolution(ctx, &serverpb.TxnIDResolutionRequest{ + CoordinatorNodeID: coordinatorNodeID, + TxnIDs: []uuid.UUID{txnID}, + }) + require.NoError(t, err) + if len(resp.ResolvedTxnIDs) != 1 { + return errors.New("txnID not found") + } + require.True(t, resp.ResolvedTxnIDs[0].TxnID.Equal(txnID)) + return nil + }, 3*time.Second) + } + + t.Run("regular_cluster", func(t *testing.T) { + status := + helper.hostCluster.Server(0 /* idx */).StatusServer().(serverpb.SQLStatusServer) + sqlConn := helper.hostCluster.ServerConn(0 /* idx */) + run(sqlutils.MakeSQLRunner(sqlConn), status, 1 /* coordinatorNodeID */) + }) + + t.Run("tenant_cluster", func(t *testing.T) { + // Select a different tenant status server here so a pod-to-pod RPC will + // happen. + status := + helper.testCluster().tenantStatusSrv(2 /* idx */) + sqlConn := helper.testCluster().tenantConn(0 /* idx */) + run(sqlConn, status, 1 /* coordinatorNodeID */) + }) +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 93a7f8bc5d6c..d68dbaf30c2a 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -146,6 +146,8 @@ go_library( "//pkg/sql/catalog/systemschema", "//pkg/sql/colexec", "//pkg/sql/contention", + "//pkg/sql/contention/txnidcache", + "//pkg/sql/contentionpb", "//pkg/sql/descmetadata", "//pkg/sql/distsql", "//pkg/sql/execinfra", diff --git a/pkg/server/serverpb/status.go b/pkg/server/serverpb/status.go index 7d9fde5a19aa..413fcfa3ceb0 100644 --- a/pkg/server/serverpb/status.go +++ b/pkg/server/serverpb/status.go @@ -35,6 +35,7 @@ type SQLStatusServer interface { ResetIndexUsageStats(context.Context, *ResetIndexUsageStatsRequest) (*ResetIndexUsageStatsResponse, error) TableIndexStats(context.Context, *TableIndexStatsRequest) (*TableIndexStatsResponse, error) UserSQLRoles(ctx context.Context, request *UserSQLRolesRequest) (*UserSQLRolesResponse, error) + TxnIDResolution(context.Context, *TxnIDResolutionRequest) (*TxnIDResolutionResponse, error) } // OptionalNodesStatusServer is a StatusServer that is only optionally present diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index d2f3cf665f35..e3ad2c7943bd 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -1418,6 +1418,24 @@ message UserSQLRolesResponse { repeated string roles = 1; } +// Request object for issuing Transaction ID Resolution. +message TxnIDResolutionRequest { + int32 CoordinatorNodeID = 1 [ + (gogoproto.customname) = "CoordinatorNodeID" + ]; + + repeated bytes txnIDs = 2 [ + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false + ]; +} + +// Response object for issuing Transaction ID Resolution. +message TxnIDResolutionResponse { + repeated cockroach.sql.contentionpb.ResolvedTxnID resolvedTxnIDs = 1 [ + (gogoproto.nullable) = false]; +} + service Status { // Certificates retrieves a copy of the TLS certificates. rpc Certificates(CertificatesRequest) returns (CertificatesResponse) { @@ -1802,4 +1820,6 @@ service Status { get: "/_status/sqlroles" }; } + + rpc TxnIDResolution(TxnIDResolutionRequest) returns (TxnIDResolutionResponse) {} } diff --git a/pkg/server/status.go b/pkg/server/status.go index dc8ec377232a..a79598883528 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -53,6 +53,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/contention" + "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/util" @@ -66,6 +68,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" raft "go.etcd.io/etcd/raft/v3" @@ -357,6 +360,50 @@ func (b *baseStatusServer) ListLocalDistSQLFlows( return response, nil } +func (b *baseStatusServer) localTxnIDResolution( + req *serverpb.TxnIDResolutionRequest, +) *serverpb.TxnIDResolutionResponse { + txnIDCache := b.sqlServer.pgServer.SQLServer.GetTxnIDCache() + + unresolvedTxnIDs := make(map[uuid.UUID]struct{}, len(req.TxnIDs)) + for _, txnID := range req.TxnIDs { + unresolvedTxnIDs[txnID] = struct{}{} + } + + resp := &serverpb.TxnIDResolutionResponse{ + ResolvedTxnIDs: make([]contentionpb.ResolvedTxnID, 0, len(req.TxnIDs)), + } + + // Optimistically attempt to resolve txn IDs. However, if the optimistic + // attempt fails, we flush the txnID cache and try again. + unresolvedTxnIDs, resp.ResolvedTxnIDs = + resolveTxnIDHelper(txnIDCache, unresolvedTxnIDs, resp.ResolvedTxnIDs) + + if len(unresolvedTxnIDs) > 0 { + txnIDCache.Flush() + } + + _, resp.ResolvedTxnIDs = + resolveTxnIDHelper(txnIDCache, unresolvedTxnIDs, resp.ResolvedTxnIDs) + + return resp +} + +func resolveTxnIDHelper( + txnIDCache *txnidcache.Cache, query map[uuid.UUID]struct{}, result []contentionpb.ResolvedTxnID, +) (remaining map[uuid.UUID]struct{}, _ []contentionpb.ResolvedTxnID) { + for txnID := range query { + if txnFingerprintID, found := txnIDCache.Lookup(txnID); found { + delete(query, txnID) + result = append(result, contentionpb.ResolvedTxnID{ + TxnID: txnID, + TxnFingerprintID: txnFingerprintID, + }) + } + } + return query, result +} + // A statusServer provides a RESTful status API. type statusServer struct { *baseStatusServer @@ -2838,3 +2885,25 @@ func (s *statusServer) JobStatus( return &serverpb.JobStatusResponse{Job: res}, nil } + +func (s *statusServer) TxnIDResolution( + ctx context.Context, req *serverpb.TxnIDResolutionRequest, +) (*serverpb.TxnIDResolutionResponse, error) { + ctx = s.AnnotateCtx(propagateGatewayMetadata(ctx)) + if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { + return nil, err + } + + nodeID := roachpb.NodeID(req.CoordinatorNodeID) + local := nodeID == s.gossip.NodeID.Get() + if local { + return s.localTxnIDResolution(req), nil + } + + statusClient, err := s.dialNode(ctx, nodeID) + if err != nil { + return nil, err + } + + return statusClient.TxnIDResolution(ctx, req) +} diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 67a317536c37..2cf104c20ced 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -888,3 +888,29 @@ func (t *tenantStatusServer) TableIndexStats( return getTableIndexUsageStats(ctx, req, t.sqlServer.pgServer.SQLServer.GetLocalIndexStatistics(), t.sqlServer.internalExecutor) } + +func (t *tenantStatusServer) TxnIDResolution( + ctx context.Context, req *serverpb.TxnIDResolutionRequest, +) (*serverpb.TxnIDResolutionResponse, error) { + ctx = t.AnnotateCtx(propagateGatewayMetadata(ctx)) + if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil { + return nil, err + } + + instanceID := base.SQLInstanceID(req.CoordinatorNodeID) + local := instanceID == t.sqlServer.SQLInstanceID() + if local { + return t.localTxnIDResolution(req), nil + } + + instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + statusClient, err := t.dialPod(ctx, instanceID, instance.InstanceAddr) + if err != nil { + return nil, err + } + + return statusClient.TxnIDResolution(ctx, req) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index cfe7be587e88..df92680d3ca3 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -312,6 +312,7 @@ go_library( "//pkg/sql/colflow", "//pkg/sql/contention", "//pkg/sql/contention/txnidcache", + "//pkg/sql/contentionpb", "//pkg/sql/covering", "//pkg/sql/delegate", "//pkg/sql/descmetadata", diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index fb36d8deadf8..103fbcd365aa 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -25,7 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" @@ -1951,7 +1951,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) { func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { // Transaction fingerprint ID will be available once transaction finishes // execution. - ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{ TxnID: txnID, TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, }) @@ -2022,7 +2022,7 @@ func (ex *connExecutor) recordTransactionFinish( ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds()) - ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{ TxnID: ev.txnID, TxnFingerprintID: transactionFingerprintID, }) diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 87685dab0f76..8d75039437e4 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/roachpb:with-mocks", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/contentionpb", "//pkg/util/cache", "//pkg/util/encoding", "//pkg/util/metric", diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go index 2827afd1eb8a..1409ed392fbe 100644 --- a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -14,12 +14,13 @@ import ( "sync" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) const messageBlockSize = 1024 -type messageBlock [messageBlockSize]ResolvedTxnID +type messageBlock [messageBlockSize]contentionpb.ResolvedTxnID // concurrentWriteBuffer is a data structure that optimizes for concurrent // writes and also implements the Writer interface. @@ -72,7 +73,7 @@ func newConcurrentWriteBuffer(sink messageSink, msgBlockPool *sync.Pool) *concur // Record records a mapping from txnID to its corresponding transaction // fingerprint ID. Record is safe to be used concurrently. -func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) { +func (c *concurrentWriteBuffer) Record(resolvedTxnID contentionpb.ResolvedTxnID) { c.flushSyncLock.RLock() defer c.flushSyncLock.RUnlock() for { diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go index 9384ed704430..da2f41f7c8eb 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -36,7 +37,7 @@ type Writer interface { // Record writes a pair of transactionID and transaction fingerprint ID // into a temporary buffer. This buffer will eventually be flushed into // the transaction ID cache asynchronously. - Record(resolvedTxnID ResolvedTxnID) + Record(resolvedTxnID contentionpb.ResolvedTxnID) // Flush starts the flushing process of writer's temporary buffer. Flush() @@ -125,17 +126,6 @@ var ( roachpb.TransactionFingerprintID(0).Size() ) -// ResolvedTxnID represents a TxnID that is resolved to its corresponding -// TxnFingerprintID. -type ResolvedTxnID struct { - TxnID uuid.UUID - TxnFingerprintID roachpb.TransactionFingerprintID -} - -func (r *ResolvedTxnID) valid() bool { - return r.TxnID != uuid.UUID{} -} - var ( _ Reader = &Cache{} _ Writer = &Cache{} @@ -174,7 +164,7 @@ func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) { t.mu.Lock() defer t.mu.Unlock() for blockIdx := range msgBlock { - if !msgBlock[blockIdx].valid() { + if !msgBlock[blockIdx].Valid() { break } t.mu.store.Add(msgBlock[blockIdx].TxnID, msgBlock[blockIdx].TxnFingerprintID) @@ -220,7 +210,7 @@ func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID } // Record implements the Writer interface. -func (t *Cache) Record(resolvedTxnID ResolvedTxnID) { +func (t *Cache) Record(resolvedTxnID contentionpb.ResolvedTxnID) { t.writer.Record(resolvedTxnID) } diff --git a/pkg/sql/contention/txnidcache/writer.go b/pkg/sql/contention/txnidcache/writer.go index f86bc6bd3916..f87331a27c70 100644 --- a/pkg/sql/contention/txnidcache/writer.go +++ b/pkg/sql/contention/txnidcache/writer.go @@ -13,6 +13,7 @@ package txnidcache import ( "sync" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -44,7 +45,7 @@ func newWriter(sink messageSink, msgBlockPool *sync.Pool) *writer { } // Record implements the Writer interface. -func (w *writer) Record(resolvedTxnID ResolvedTxnID) { +func (w *writer) Record(resolvedTxnID contentionpb.ResolvedTxnID) { shardIdx := hashTxnID(resolvedTxnID.TxnID) buffer := w.shards[shardIdx] buffer.Record(resolvedTxnID) diff --git a/pkg/sql/contentionpb/BUILD.bazel b/pkg/sql/contentionpb/BUILD.bazel index 7e9f50ed5fc1..24e898e39bbe 100644 --- a/pkg/sql/contentionpb/BUILD.bazel +++ b/pkg/sql/contentionpb/BUILD.bazel @@ -8,6 +8,7 @@ go_library( embed = [":contentionpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/contentionpb", visibility = ["//visibility:public"], + deps = ["//pkg/util/uuid"], ) proto_library( diff --git a/pkg/sql/contentionpb/contention.go b/pkg/sql/contentionpb/contention.go index a45126999f73..40d23752b6ef 100644 --- a/pkg/sql/contentionpb/contention.go +++ b/pkg/sql/contentionpb/contention.go @@ -13,6 +13,8 @@ package contentionpb import ( "fmt" "strings" + + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) const singleIndentation = " " @@ -62,3 +64,8 @@ func (skc SingleNonSQLKeyContention) String() string { } return b.String() } + +// Valid returns if the ResolvedTxnID is valid. +func (r *ResolvedTxnID) Valid() bool { + return r.TxnID != uuid.UUID{} +} diff --git a/pkg/sql/contentionpb/contention.proto b/pkg/sql/contentionpb/contention.proto index baf929956f53..65f221fef5a3 100644 --- a/pkg/sql/contentionpb/contention.proto +++ b/pkg/sql/contentionpb/contention.proto @@ -128,3 +128,23 @@ message SerializedRegistry { repeated SingleNonSQLKeyContention non_sql_keys_contention = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "NonSQLKeysContention"]; } + +message ResolvedTxnID { + // The Response only returns the txnID that is present on the + // inquired node. If the txnID is not present on the inquired + // node, then it is not returned. + bytes txnID = 1 [ + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false + ]; + + // This field is allowed to be null. This means the given transaction ID + // is found on the given node. However, the transaction fingerprint ID + // for that transaction ID is not yet available since the transaction + // has not yet finished executing. + // This would require the node to check ActiveQueries store in addition + // to TxnID Cache. + uint64 txnFingerprintID = 2 [(gogoproto.customname) = "TxnFingerprintID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", + (gogoproto.nullable) = false]; +}