diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index d668a644e5fe..d9a7f6aa9332 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -4282,6 +4282,64 @@ UserSQLRolesResponse returns a list of roles for the logged SQL user. +## TxnIDResolution + + + +TxnIDResolution is used by the contention event store to resolve +transaction ID into transaction fingerprint IDs. +This RPC does not have a corresponding HTTP endpoint on purpose, since +DB Console should never directly query this endpoint. + +The API contract is the following: +- if the server can resolve the transaction IDs in the RPC request, it will + be returned in the RPC response. +- if the server is not able to resolve the transaction IDs, it will + instructs the transaction ID cache to drain its write buffer. (Since + transaction ID cache's write path is asynchronous, the transaction ID + requested by the client might not be available in the cache yet). + Client is responsible to perform retries if the requested transaction ID + is not returned in the RPC response. + +Support status: [reserved](#support-status) + +#### Request Parameters + + + + +Request object for issuing Transaction ID Resolution. + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| coordinator_id | [string](#cockroach.server.serverpb.TxnIDResolutionRequest-string) | | coordinator_id is either the NodeID or SQLInstanceID depending on whether the transaction is executed on a system tenant or a regular tenant. | [reserved](#support-status) | +| txnIDs | [bytes](#cockroach.server.serverpb.TxnIDResolutionRequest-bytes) | repeated | | [reserved](#support-status) | + + + + + + + +#### Response Parameters + + + + +Response object for issuing Transaction ID Resolution. + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| resolvedTxnIDs | [cockroach.sql.contentionpb.ResolvedTxnID](#cockroach.server.serverpb.TxnIDResolutionResponse-cockroach.sql.contentionpb.ResolvedTxnID) | repeated | | [reserved](#support-status) | + + + + + + + ## RequestCA `GET /_join/v1/ca` diff --git a/pkg/ccl/serverccl/statusccl/BUILD.bazel b/pkg/ccl/serverccl/statusccl/BUILD.bazel index 3c259c311ea7..ded721723a4c 100644 --- a/pkg/ccl/serverccl/statusccl/BUILD.bazel +++ b/pkg/ccl/serverccl/statusccl/BUILD.bazel @@ -54,6 +54,7 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/timeutil", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index 1bea8f4d9fa1..e7c5400f9976 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -82,6 +83,10 @@ func TestTenantStatusAPI(t *testing.T) { t.Run("index_usage_stats", func(t *testing.T) { testIndexUsageForTenants(t, testHelper) }) + + t.Run("txn_id_resolution", func(t *testing.T) { + testTxnIDResolutionRPC(ctx, t, testHelper) + }) } func TestTenantCannotSeeNonTenantStats(t *testing.T) { @@ -887,3 +892,60 @@ func testTenantStatusCancelQuery(ctx context.Context, t *testing.T, helper *tena require.Equal(t, false, cancelQueryResp.Canceled) require.Equal(t, fmt.Sprintf("query ID %s not found", query.ID), cancelQueryResp.Error) } + +// testTxnIDResolutionRPC tests the reachability of TxnIDResolution RPC. The +// underlying implementation correctness is tested within +// pkg/sql/contention/txnidcache. +func testTxnIDResolutionRPC(ctx context.Context, t *testing.T, helper *tenantTestHelper) { + run := func(sqlConn *sqlutils.SQLRunner, status serverpb.SQLStatusServer, coordinatorNodeID int32) { + sqlConn.Exec(t, "SET application_name='test1'") + + sqlConn.Exec(t, "BEGIN") + result := sqlConn.QueryStr(t, ` + SELECT + id + FROM + crdb_internal.node_transactions + WHERE + application_name = 'test1'`) + require.Equal(t, 1 /* expected */, len(result), + "expected only one active txn, but there are %d active txns found", len(result)) + txnID := uuid.FromStringOrNil(result[0][0]) + require.False(t, uuid.Nil.Equal(txnID), + "expected a valid txnID, but %+v is found", result) + sqlConn.Exec(t, "COMMIT") + + testutils.SucceedsWithin(t, func() error { + resp, err := status.TxnIDResolution(ctx, &serverpb.TxnIDResolutionRequest{ + CoordinatorID: strconv.Itoa(int(coordinatorNodeID)), + TxnIDs: []uuid.UUID{txnID}, + }) + require.NoError(t, err) + if len(resp.ResolvedTxnIDs) != 1 { + return errors.Newf("expected RPC response to have length of 1, but "+ + "it is %d", len(resp.ResolvedTxnIDs)) + } + require.Equal(t, txnID, resp.ResolvedTxnIDs[0].TxnID, + "expected to find txn %s on coordinator node %d, but it "+ + "was not", txnID.String(), coordinatorNodeID) + require.NotEqual(t, roachpb.InvalidTransactionFingerprintID, resp.ResolvedTxnIDs[0].TxnFingerprintID) + return nil + }, 3*time.Second) + } + + t.Run("regular_cluster", func(t *testing.T) { + status := + helper.hostCluster.Server(0 /* idx */).StatusServer().(serverpb.SQLStatusServer) + sqlConn := helper.hostCluster.ServerConn(0 /* idx */) + run(sqlutils.MakeSQLRunner(sqlConn), status, 1 /* coordinatorNodeID */) + }) + + t.Run("tenant_cluster", func(t *testing.T) { + // Select a different tenant status server here so a pod-to-pod RPC will + // happen. + status := + helper.testCluster().tenantStatusSrv(2 /* idx */) + sqlConn := helper.testCluster().tenantConn(0 /* idx */) + run(sqlConn, status, 1 /* coordinatorNodeID */) + }) +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 2a1a984aa15e..fc66834c1b4d 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -151,6 +151,7 @@ go_library( "//pkg/sql/catalog/systemschema", "//pkg/sql/colexec", "//pkg/sql/contention", + "//pkg/sql/contentionpb", "//pkg/sql/descmetadata", "//pkg/sql/distsql", "//pkg/sql/execinfra", diff --git a/pkg/server/serverpb/status.go b/pkg/server/serverpb/status.go index c2f691577d68..dd19acd85a13 100644 --- a/pkg/server/serverpb/status.go +++ b/pkg/server/serverpb/status.go @@ -36,6 +36,7 @@ type SQLStatusServer interface { ResetIndexUsageStats(context.Context, *ResetIndexUsageStatsRequest) (*ResetIndexUsageStatsResponse, error) TableIndexStats(context.Context, *TableIndexStatsRequest) (*TableIndexStatsResponse, error) UserSQLRoles(ctx context.Context, request *UserSQLRolesRequest) (*UserSQLRolesResponse, error) + TxnIDResolution(context.Context, *TxnIDResolutionRequest) (*TxnIDResolutionResponse, error) } // OptionalNodesStatusServer is a StatusServer that is only optionally present diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 44a4feb0fea5..5caea44e067f 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -1489,6 +1489,24 @@ message UserSQLRolesResponse { repeated string roles = 1; } +// Request object for issuing Transaction ID Resolution. +message TxnIDResolutionRequest { + // coordinator_id is either the NodeID or SQLInstanceID depending on whether + // the transaction is executed on a system tenant or a regular tenant. + string coordinator_id = 1 [(gogoproto.customname) = "CoordinatorID"]; + + repeated bytes txnIDs = 2 [ + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false + ]; +} + +// Response object for issuing Transaction ID Resolution. +message TxnIDResolutionResponse { + repeated cockroach.sql.contentionpb.ResolvedTxnID resolvedTxnIDs = 1 [ + (gogoproto.nullable) = false]; +} + service Status { // Certificates retrieves a copy of the TLS certificates. rpc Certificates(CertificatesRequest) returns (CertificatesResponse) { @@ -1881,4 +1899,20 @@ service Status { get: "/_status/sqlroles" }; } + + // TxnIDResolution is used by the contention event store to resolve + // transaction ID into transaction fingerprint IDs. + // This RPC does not have a corresponding HTTP endpoint on purpose, since + // DB Console should never directly query this endpoint. + // + // The API contract is the following: + // - if the server can resolve the transaction IDs in the RPC request, it will + // be returned in the RPC response. + // - if the server is not able to resolve the transaction IDs, it will + // instructs the transaction ID cache to drain its write buffer. (Since + // transaction ID cache's write path is asynchronous, the transaction ID + // requested by the client might not be available in the cache yet). + // Client is responsible to perform retries if the requested transaction ID + // is not returned in the RPC response. + rpc TxnIDResolution(TxnIDResolutionRequest) returns (TxnIDResolutionResponse) {} } diff --git a/pkg/server/status.go b/pkg/server/status.go index 407068a05818..5d736918539e 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/contention" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" @@ -69,6 +70,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" raft "go.etcd.io/etcd/raft/v3" @@ -366,6 +368,39 @@ func (b *baseStatusServer) ListLocalDistSQLFlows( return response, nil } +func (b *baseStatusServer) localTxnIDResolution( + req *serverpb.TxnIDResolutionRequest, +) *serverpb.TxnIDResolutionResponse { + txnIDCache := b.sqlServer.pgServer.SQLServer.GetTxnIDCache() + + unresolvedTxnIDs := make(map[uuid.UUID]struct{}, len(req.TxnIDs)) + for _, txnID := range req.TxnIDs { + unresolvedTxnIDs[txnID] = struct{}{} + } + + resp := &serverpb.TxnIDResolutionResponse{ + ResolvedTxnIDs: make([]contentionpb.ResolvedTxnID, 0, len(req.TxnIDs)), + } + + for i := range req.TxnIDs { + if txnFingerprintID, found := txnIDCache.Lookup(req.TxnIDs[i]); found { + resp.ResolvedTxnIDs = append(resp.ResolvedTxnIDs, contentionpb.ResolvedTxnID{ + TxnID: req.TxnIDs[i], + TxnFingerprintID: txnFingerprintID, + }) + } + } + + // If we encounter any transaction ID that we cannot resolve, we tell the + // txnID cache to drain its write buffer (note: The .DrainWriteBuffer() call + // is asynchronous). The client of this RPC will perform retries. + if len(unresolvedTxnIDs) > 0 { + txnIDCache.DrainWriteBuffer() + } + + return resp +} + // A statusServer provides a RESTful status API. type statusServer struct { *baseStatusServer @@ -3028,3 +3063,27 @@ func (s *statusServer) JobStatus( return &serverpb.JobStatusResponse{Job: res}, nil } + +func (s *statusServer) TxnIDResolution( + ctx context.Context, req *serverpb.TxnIDResolutionRequest, +) (*serverpb.TxnIDResolutionResponse, error) { + ctx = s.AnnotateCtx(propagateGatewayMetadata(ctx)) + if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { + return nil, err + } + + requestedNodeID, local, err := s.parseNodeID(req.CoordinatorID) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + if local { + return s.localTxnIDResolution(req), nil + } + + statusClient, err := s.dialNode(ctx, requestedNodeID) + if err != nil { + return nil, err + } + + return statusClient.TxnIDResolution(ctx, req) +} diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 53a0cbba28d7..714782b050cd 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -1042,3 +1042,31 @@ func (t *tenantStatusServer) NodesList( } return &resp, err } + +func (t *tenantStatusServer) TxnIDResolution( + ctx context.Context, req *serverpb.TxnIDResolutionRequest, +) (*serverpb.TxnIDResolutionResponse, error) { + ctx = t.AnnotateCtx(propagateGatewayMetadata(ctx)) + if _, err := t.privilegeChecker.requireAdminUser(ctx); err != nil { + return nil, err + } + + instanceID, local, err := t.parseInstanceID(req.CoordinatorID) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + if local { + return t.localTxnIDResolution(req), nil + } + + instance, err := t.sqlServer.sqlInstanceProvider.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + statusClient, err := t.dialPod(ctx, instanceID, instance.InstanceAddr) + if err != nil { + return nil, err + } + + return statusClient.TxnIDResolution(ctx, req) +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 646718fac821..298685ce69be 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -312,6 +312,7 @@ go_library( "//pkg/sql/colflow", "//pkg/sql/contention", "//pkg/sql/contention/txnidcache", + "//pkg/sql/contentionpb", "//pkg/sql/covering", "//pkg/sql/delegate", "//pkg/sql/distsql", diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a0a86e80865e..a974684629c6 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -26,7 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" @@ -1952,7 +1952,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) { func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { // Transaction fingerprint ID will be available once transaction finishes // execution. - ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{ TxnID: txnID, TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, }) @@ -2027,7 +2027,7 @@ func (ex *connExecutor) recordTransactionFinish( } ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds()) - ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{ TxnID: ev.txnID, TxnFingerprintID: transactionFingerprintID, }) diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel index 3551b9e641b0..62327daab8f3 100644 --- a/pkg/sql/contention/txnidcache/BUILD.bazel +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/contention/contentionutils", + "//pkg/sql/contentionpb", "//pkg/util/encoding", "//pkg/util/metric", "//pkg/util/stop", @@ -42,6 +43,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/contentionpb", "//pkg/sql/sessiondata", "//pkg/sql/tests", "//pkg/testutils", diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go index 8ded7fea4167..329d75a06fb1 100644 --- a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -14,13 +14,14 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" ) // blockSize is chosen as 168 since each ResolvedTxnID is 24 byte. // 168 * 24 = 4032 bytes < 4KiB page size. const blockSize = 168 -type block [blockSize]ResolvedTxnID +type block [blockSize]contentionpb.ResolvedTxnID var blockPool = &sync.Pool{ New: func() interface{} { @@ -69,14 +70,14 @@ func newConcurrentWriteBuffer(sink blockSink) *concurrentWriteBuffer { // Record records a mapping from txnID to its corresponding transaction // fingerprint ID. Record is safe to be used concurrently. -func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) { +func (c *concurrentWriteBuffer) Record(resolvedTxnID contentionpb.ResolvedTxnID) { c.guard.AtomicWrite(func(writerIdx int64) { c.guard.block[writerIdx] = resolvedTxnID }) } -// Flush flushes concurrentWriteBuffer into the channel. It implements the -// txnidcache.Writer interface. -func (c *concurrentWriteBuffer) Flush() { +// DrainWriteBuffer flushes concurrentWriteBuffer into the channel. It +// implements the txnidcache.Writer interface. +func (c *concurrentWriteBuffer) DrainWriteBuffer() { c.guard.ForceSync() } diff --git a/pkg/sql/contention/txnidcache/fifo_cache.go b/pkg/sql/contention/txnidcache/fifo_cache.go index d92dc0036299..5217d608e5d1 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache.go +++ b/pkg/sql/contention/txnidcache/fifo_cache.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/contention/contentionutils" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -71,7 +72,7 @@ func (c *fifoCache) add(b *block) { blockSize := 0 for i := range b { - if !b[i].valid() { + if !b[i].Valid() { break } @@ -120,7 +121,7 @@ func (c *fifoCache) maybeEvictLocked() { // evictNodeLocked deletes all entries in the block from the internal map. func (c *fifoCache) evictNodeLocked(node *blockListNode) { for i := 0; i < blockSize; i++ { - if !node.block[i].valid() { + if !node.block[i].Valid() { break } @@ -128,7 +129,7 @@ func (c *fifoCache) evictNodeLocked(node *blockListNode) { } } -func (e *blockList) append(block []ResolvedTxnID) { +func (e *blockList) append(block []contentionpb.ResolvedTxnID) { block = e.appendToTail(block) for len(block) > 0 { e.addNode() @@ -147,7 +148,9 @@ func (e *blockList) addNode() { e.tailIdx = 0 } -func (e *blockList) appendToTail(block []ResolvedTxnID) (remaining []ResolvedTxnID) { +func (e *blockList) appendToTail( + block []contentionpb.ResolvedTxnID, +) (remaining []contentionpb.ResolvedTxnID) { if e.head == nil { return block } diff --git a/pkg/sql/contention/txnidcache/fifo_cache_test.go b/pkg/sql/contention/txnidcache/fifo_cache_test.go index 432e3d456476..e43fff26ce03 100644 --- a/pkg/sql/contention/txnidcache/fifo_cache_test.go +++ b/pkg/sql/contention/txnidcache/fifo_cache_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -93,7 +94,7 @@ func TestFIFOCache(t *testing.T) { } func removeEntryFromMap( - m map[uuid.UUID]roachpb.TransactionFingerprintID, filterList []ResolvedTxnID, + m map[uuid.UUID]roachpb.TransactionFingerprintID, filterList []contentionpb.ResolvedTxnID, ) map[uuid.UUID]roachpb.TransactionFingerprintID { newMap := make(map[uuid.UUID]roachpb.TransactionFingerprintID) for k, v := range m { @@ -119,7 +120,7 @@ func checkEvictionListContent(t *testing.T, cache *fifoCache, expectedBlocks []* "is nil", evictionListSize) for blockIdx := 0; blockIdx < blockSize; blockIdx++ { - if !expectedBlocks[i][blockIdx].valid() { + if !expectedBlocks[i][blockIdx].Valid() { break } @@ -131,7 +132,7 @@ func checkEvictionListContent(t *testing.T, cache *fifoCache, expectedBlocks []* evictionListBlockIdx++ isEvictionListIdxStillValid := - evictionListBlockIdx < blockSize && cur.block[evictionListBlockIdx].valid() + evictionListBlockIdx < blockSize && cur.block[evictionListBlockIdx].Valid() if !isEvictionListIdxStillValid { cur = cur.next @@ -154,7 +155,7 @@ func checkEvictionListShape(t *testing.T, cache *fifoCache, expectedBlockSizes [ actualBlockSize := 0 for blockIdx := 0; blockIdx < blockSize; blockIdx++ { - if !cur.block[blockIdx].valid() { + if !cur.block[blockIdx].Valid() { break } actualBlockSize++ diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go index ae7cd92b7fe0..5baad3dab7e0 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -33,10 +34,11 @@ type Writer interface { // Record writes a pair of transactionID and transaction fingerprint ID // into a temporary buffer. This buffer will eventually be flushed into // the transaction ID cache asynchronously. - Record(resolvedTxnID ResolvedTxnID) + Record(resolvedTxnID contentionpb.ResolvedTxnID) - // Flush starts the flushing process of writer's temporary buffer. - Flush() + // DrainWriteBuffer starts to flush of writer's temporary buffer into the + // cache. + DrainWriteBuffer() } type blockSink interface { @@ -115,17 +117,6 @@ var ( roachpb.TransactionFingerprintID(0).Size() ) -// ResolvedTxnID represents a TxnID that is resolved to its corresponding -// TxnFingerprintID. -type ResolvedTxnID struct { - TxnID uuid.UUID - TxnFingerprintID roachpb.TransactionFingerprintID -} - -func (r *ResolvedTxnID) valid() bool { - return r.TxnID != uuid.UUID{} -} - var ( _ Reader = &Cache{} _ Writer = &Cache{} @@ -181,7 +172,7 @@ func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID } // Record implements the Writer interface. -func (t *Cache) Record(resolvedTxnID ResolvedTxnID) { +func (t *Cache) Record(resolvedTxnID contentionpb.ResolvedTxnID) { t.writer.Record(resolvedTxnID) } @@ -193,9 +184,9 @@ func (t *Cache) push(b *block) { } } -// Flush flushes the resolved txn IDs in the Writer into the Cache. -func (t *Cache) Flush() { - t.writer.Flush() +// DrainWriteBuffer flushes the resolved txn IDs in the Writer into the Cache. +func (t *Cache) DrainWriteBuffer() { + t.writer.DrainWriteBuffer() } // Size return the current size of the Cache. diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_test.go b/pkg/sql/contention/txnidcache/txn_id_cache_test.go index 4e70c0a5ee9f..adffdb932651 100644 --- a/pkg/sql/contention/txnidcache/txn_id_cache_test.go +++ b/pkg/sql/contention/txnidcache/txn_id_cache_test.go @@ -173,7 +173,7 @@ func TestTransactionIDCache(t *testing.T) { sqlServer := testServer.SQLServer().(*sql.Server) txnIDCache := sqlServer.GetTxnIDCache() - txnIDCache.Flush() + txnIDCache.DrainWriteBuffer() t.Run("resolved_txn_id_cache_record", func(t *testing.T) { testutils.SucceedsWithin(t, func() error { for txnID, expectedTxnFingerprintID := range expectedTxnIDToUUIDMapping { @@ -196,7 +196,7 @@ func TestTransactionIDCache(t *testing.T) { // Execute additional queries to ensure we are overflowing the size limit. testConn.Exec(t, "SELECT 1") - txnIDCache.Flush() + txnIDCache.DrainWriteBuffer() testutils.SucceedsWithin(t, func() error { sizePostEviction := txnIDCache.Size() @@ -218,7 +218,7 @@ func TestTransactionIDCache(t *testing.T) { txnFingerprintID roachpb.TransactionFingerprintID) { if strings.Contains(sessionData.ApplicationName, appName) { if txnFingerprintID != roachpb.InvalidTransactionFingerprintID { - txnIDCache.Flush() + txnIDCache.DrainWriteBuffer() testutils.SucceedsWithin(t, func() error { existingTxnFingerprintID, ok := txnIDCache.Lookup(txnID) diff --git a/pkg/sql/contention/txnidcache/writer.go b/pkg/sql/contention/txnidcache/writer.go index dc338111fd46..88c2f4d2c9de 100644 --- a/pkg/sql/contention/txnidcache/writer.go +++ b/pkg/sql/contention/txnidcache/writer.go @@ -12,6 +12,7 @@ package txnidcache import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -44,7 +45,7 @@ func newWriter(st *cluster.Settings, sink blockSink) *writer { } // Record implements the Writer interface. -func (w *writer) Record(resolvedTxnID ResolvedTxnID) { +func (w *writer) Record(resolvedTxnID contentionpb.ResolvedTxnID) { if MaxSize.Get(&w.st.SV) == 0 { return } @@ -53,10 +54,10 @@ func (w *writer) Record(resolvedTxnID ResolvedTxnID) { buffer.Record(resolvedTxnID) } -// Flush implements the Writer interface. -func (w *writer) Flush() { +// DrainWriteBuffer implements the Writer interface. +func (w *writer) DrainWriteBuffer() { for shardIdx := 0; shardIdx < shardCount; shardIdx++ { - w.shards[shardIdx].Flush() + w.shards[shardIdx].DrainWriteBuffer() } } diff --git a/pkg/sql/contention/txnidcache/writer_test.go b/pkg/sql/contention/txnidcache/writer_test.go index 873c61363f41..efcc383710d3 100644 --- a/pkg/sql/contention/txnidcache/writer_test.go +++ b/pkg/sql/contention/txnidcache/writer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -94,7 +95,7 @@ func BenchmarkWriter(b *testing.B) { randomValueBase := numOfOps * writerIdx for i := 0; i < numOfOps; i++ { randomValue := randomValueBase + i - w.Record(ResolvedTxnID{ + w.Record(contentionpb.ResolvedTxnID{ TxnID: generateUUID(uint64(randomValue)), TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxInt - randomValue), }) @@ -160,7 +161,7 @@ var _ blockSink = &counterSink{} func (c *counterSink) push(block *block) { for i := 0; i < blockSize; i++ { - if !block[i].valid() { + if !block[i].Valid() { break } c.numOfRecord++ @@ -173,28 +174,28 @@ func TestTxnIDCacheCanBeDisabledViaClusterSetting(t *testing.T) { sink := &counterSink{} w := newWriter(st, sink) - w.Record(ResolvedTxnID{ + w.Record(contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), }) - w.Flush() + w.DrainWriteBuffer() require.Equal(t, 1, sink.numOfRecord) // This should disable txn id cache. MaxSize.Override(ctx, &st.SV, 0) - w.Record(ResolvedTxnID{ + w.Record(contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), }) - w.Flush() + w.DrainWriteBuffer() require.Equal(t, 1, sink.numOfRecord) // This should re-enable txn id cache. MaxSize.Override(ctx, &st.SV, MaxSize.Default()) - w.Record(ResolvedTxnID{ + w.Record(contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), }) - w.Flush() + w.DrainWriteBuffer() require.Equal(t, 2, sink.numOfRecord) } diff --git a/pkg/sql/contentionpb/BUILD.bazel b/pkg/sql/contentionpb/BUILD.bazel index 3eb6b15fe1b3..44ac6944e925 100644 --- a/pkg/sql/contentionpb/BUILD.bazel +++ b/pkg/sql/contentionpb/BUILD.bazel @@ -8,6 +8,7 @@ go_library( embed = [":contentionpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/contentionpb", visibility = ["//visibility:public"], + deps = ["//pkg/util/uuid"], ) proto_library( diff --git a/pkg/sql/contentionpb/contention.go b/pkg/sql/contentionpb/contention.go index a45126999f73..746a77d4f577 100644 --- a/pkg/sql/contentionpb/contention.go +++ b/pkg/sql/contentionpb/contention.go @@ -13,6 +13,8 @@ package contentionpb import ( "fmt" "strings" + + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) const singleIndentation = " " @@ -62,3 +64,8 @@ func (skc SingleNonSQLKeyContention) String() string { } return b.String() } + +// Valid returns if the ResolvedTxnID is valid. +func (r *ResolvedTxnID) Valid() bool { + return !uuid.Nil.Equal(r.TxnID) +} diff --git a/pkg/sql/contentionpb/contention.proto b/pkg/sql/contentionpb/contention.proto index baf929956f53..86d8aee1cfa0 100644 --- a/pkg/sql/contentionpb/contention.proto +++ b/pkg/sql/contentionpb/contention.proto @@ -128,3 +128,18 @@ message SerializedRegistry { repeated SingleNonSQLKeyContention non_sql_keys_contention = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "NonSQLKeysContention"]; } + +message ResolvedTxnID { + bytes txnID = 1 [ + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false + ]; + + // txnFingerprintID is the corresponding transaction fingerprint ID of the + // txnID. However, if the transaction fingerprint ID of the txnID is not yet + // known, (i.e. when the transaction is still executing), this value is filled + // with roachpb.InvalidTransactionFingerprintID. + uint64 txnFingerprintID = 2 [(gogoproto.customname) = "TxnFingerprintID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", + (gogoproto.nullable) = false]; +}