From fd8f40a18141fbdb4d582f7b283703fcbb6974df Mon Sep 17 00:00:00 2001 From: Azhng Date: Wed, 16 Feb 2022 03:50:12 +0000 Subject: [PATCH] sql: extend resolverQueue to resolve waiter txn id Previously, the resovlerQueue used in the contention event store only resolved the txnID of the blocking transaction. This commit, the resolverQueue would also resolve the txnID of the waiting transaction. Release note: None --- pkg/BUILD.bazel | 1 + pkg/sql/contention/event_store.go | 21 +- pkg/sql/contention/event_store_test.go | 6 +- pkg/sql/contention/resolver.go | 184 +++++++++++----- pkg/sql/contention/resolver_test.go | 273 +++++++++++++++++++----- pkg/sql/contentionpb/BUILD.bazel | 20 +- pkg/sql/contentionpb/contention.go | 31 +++ pkg/sql/contentionpb/contention_test.go | 67 ++++++ 8 files changed, 483 insertions(+), 120 deletions(-) create mode 100644 pkg/sql/contentionpb/contention_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a931feacecfa..46d6725b0947 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -254,6 +254,7 @@ ALL_TESTS = [ "//pkg/sql/contention/contentionutils:contentionutils_test", "//pkg/sql/contention/txnidcache:txnidcache_test", "//pkg/sql/contention:contention_test", + "//pkg/sql/contentionpb:contentionpb_test", "//pkg/sql/covering:covering_test", "//pkg/sql/distsql:distsql_test", "//pkg/sql/doctor:doctor_test", diff --git a/pkg/sql/contention/event_store.go b/pkg/sql/contention/event_store.go index 77e623c15dd7..f6348536488a 100644 --- a/pkg/sql/contention/event_store.go +++ b/pkg/sql/contention/event_store.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" ) const ( @@ -248,14 +247,14 @@ func (s *eventStore) forEachEvent( // is important since the op() callback can take arbitrary long to execute, // we should not be holding the lock while op() is executing. s.mu.RLock() - keys := make([]uuid.UUID, 0, s.mu.store.Len()) + keys := make([]uint64, 0, s.mu.store.Len()) s.mu.store.Do(func(entry *cache.Entry) { - keys = append(keys, entry.Key.(uuid.UUID)) + keys = append(keys, entry.Key.(uint64)) }) s.mu.RUnlock() for i := range keys { - event, ok := s.getEventByBlockingTxnID(keys[i]) + event, ok := s.getEventByEventHash(keys[i]) if !ok { // The event might have been evicted between reading the keys and // getting the event. In this case we simply ignore it. @@ -269,13 +268,13 @@ func (s *eventStore) forEachEvent( return nil } -func (s *eventStore) getEventByBlockingTxnID( - txnID uuid.UUID, +func (s *eventStore) getEventByEventHash( + hash uint64, ) (_ contentionpb.ExtendedContentionEvent, ok bool) { s.mu.RLock() defer s.mu.RUnlock() - event, ok := s.mu.store.Get(txnID) + event, ok := s.mu.store.Get(hash) return event.(contentionpb.ExtendedContentionEvent), ok } @@ -319,7 +318,7 @@ func (s *eventStore) upsertBatch(events []contentionpb.ExtendedContentionEvent) if !ok { atomic.AddInt64(&s.atomic.storageSize, int64(entryBytes(&events[i]))) } - s.mu.store.Add(blockingTxnID, events[i]) + s.mu.store.Add(events[i].Hash(), events[i]) } } @@ -333,7 +332,7 @@ func (s *eventStore) resolutionIntervalWithJitter() time.Duration { } func entryBytes(event *contentionpb.ExtendedContentionEvent) int { - // Since we store the blocking txn's txnID as the key to the unordered cache, - // this is means we are storing another copy of uuid. - return event.Size() + uuid.UUID{}.Size() + // Since we store the event's hash as the key to the unordered cache, + // this is means we are storing another copy of uint64 (8 bytes). + return event.Size() + 8 } diff --git a/pkg/sql/contention/event_store_test.go b/pkg/sql/contention/event_store_test.go index 42914edc7281..f9700bb4e511 100644 --- a/pkg/sql/contention/event_store_test.go +++ b/pkg/sql/contention/event_store_test.go @@ -201,10 +201,14 @@ func randomlyGenerateTestData(testSize int, numOfCoordinator int) []testData { tcs := make([]testData, 0, testSize) for i := 0; i < testSize; i++ { tcs = append(tcs, testData{ - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxUint64 - uint64(i)), }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(math.MaxUint64/2 - uint64(i)), + }, coordinatorNodeID: strconv.Itoa(rand.Intn(numOfCoordinator)), }) } diff --git a/pkg/sql/contention/resolver.go b/pkg/sql/contention/resolver.go index c19f26a30399..ecf18646637f 100644 --- a/pkg/sql/contention/resolver.go +++ b/pkg/sql/contention/resolver.go @@ -12,6 +12,7 @@ package contention import ( "context" + "math" "sort" "strconv" @@ -83,6 +84,11 @@ const ( // retry resolving until giving up. This needs to be a finite number to handle // the case where the node is permanently removed from the cluster. retryBudgetForRPCFailure = uint32(3) + + // retryBudgetForTxnInProgress is a special value indicating that the resolver should + // indefinitely retry the resolution. This is because the retry is due to the + // transaction is still in progress. + retryBudgetForTxnInProgress = uint32(math.MaxUint32) ) // ResolverEndpoint is an alias for the TxnIDResolution RPC endpoint in the @@ -96,7 +102,10 @@ type resolverQueueImpl struct { unresolvedEvents []contentionpb.ExtendedContentionEvent resolvedEvents []contentionpb.ExtendedContentionEvent - remainingRetries map[uuid.UUID]uint32 + // remainingRetries stores a mapping of each contention event to its + // remaining number of retries attempts. The key in the map is the hash of + // the contention event. + remainingRetries map[uint64]uint32 } resolverEndpoint ResolverEndpoint @@ -111,7 +120,7 @@ func newResolver(endpoint ResolverEndpoint, sizeHint int) *resolverQueueImpl { s.mu.unresolvedEvents = make([]contentionpb.ExtendedContentionEvent, 0, sizeHint) s.mu.resolvedEvents = make([]contentionpb.ExtendedContentionEvent, 0, sizeHint) - s.mu.remainingRetries = make(map[uuid.UUID]uint32, sizeHint) + s.mu.remainingRetries = make(map[uint64]uint32, sizeHint) return s } @@ -160,43 +169,54 @@ func (q *resolverQueueImpl) resolveLocked(ctx context.Context) error { // by observing some node'q load metrics (e.g. QPS value) and start // self-throttling once that QPS value exceed certain value. - req := makeRPCRequestFromBatch(currentBatch) - resp, err := q.resolverEndpoint(ctx, req) + blockingTxnIDsReq, waitingTxnIDsReq := makeRPCRequestsFromBatch(currentBatch) + + blockingTxnIDsResp, err := q.resolverEndpoint(ctx, blockingTxnIDsReq) + if err != nil { + allErrors = errors.CombineErrors(allErrors, err) + } + + waitingTxnIDsResp, err := q.resolverEndpoint(ctx, waitingTxnIDsReq) if err != nil { - q.maybeRequeueBatchLocked(currentBatch, retryBudgetForRPCFailure) - // Read next batch of unresolved contention events. - currentBatch, remaining = readUntilNextCoordinatorID(remaining) allErrors = errors.CombineErrors(allErrors, err) - continue } - resolvedTxnIDs, inProgressTxnIDs := extractResolvedAndInProgressTxnIDs(resp) + + resolvedBlockingTxnIDs, inProgressBlockingTxnIDs := extractResolvedAndInProgressTxnIDs(blockingTxnIDsResp) + resolvedWaitingTxnIDs, inProgressWaitingTxnIDs := extractResolvedAndInProgressTxnIDs(waitingTxnIDsResp) for _, event := range currentBatch { - // If the coordinator node indicates that it is aware of the requested - // txnID but does not yet have the corresponding txnFingerprintID, - // (e.g. when the transaction is still executing), we re-queue - // the contention event, so we will check in with the coordinator node - // again later. In this case, we don't want to update the retry - // record since we are confident that the txnID entry on the coordinator - // node has not yet being evicted. - if _, ok := inProgressTxnIDs[event.BlockingEvent.TxnMeta.ID]; ok { - q.mu.unresolvedEvents = append(q.mu.unresolvedEvents, event) - // Clear any retry count if there is any. - delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID) - continue + needToRetryDueToBlockingTxnID, initialRetryBudgetDueToBlockingTxnID := + maybeUpdateTxnFingerprintID( + event.BlockingEvent.TxnMeta.ID, + &event.BlockingTxnFingerprintID, + resolvedBlockingTxnIDs, + inProgressBlockingTxnIDs, + ) + + needToRetryDueToWaitingTxnID, initialRetryBudgetDueToWaitingTxnID := + maybeUpdateTxnFingerprintID( + event.WaitingTxnID, + &event.WaitingTxnFingerprintID, + resolvedWaitingTxnIDs, + inProgressWaitingTxnIDs, + ) + + // The initial retry budget is + // max( + // initialRetryBudgetDueToBlockingTxnID, + // initialRetryBudgetDueToWaitingTxnID, + // ). + initialRetryBudget := initialRetryBudgetDueToBlockingTxnID + if initialRetryBudget < initialRetryBudgetDueToWaitingTxnID { + initialRetryBudget = initialRetryBudgetDueToWaitingTxnID } - // If we successfully resolveLocked the transaction ID, we append it to the - // resolvedEvent slice and clear remaining retry count if there is any. - if txnFingerprintID, ok := resolvedTxnIDs[event.BlockingEvent.TxnMeta.ID]; ok { - event.BlockingTxnFingerprintID = txnFingerprintID + if needToRetryDueToBlockingTxnID || needToRetryDueToWaitingTxnID { + q.maybeRequeueEventForRetryLocked(event, initialRetryBudget) + } else { q.mu.resolvedEvents = append(q.mu.resolvedEvents, event) - - delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID) - continue + delete(q.mu.remainingRetries, event.Hash()) } - - q.maybeRequeueEventForRetryLocked(event, retryBudgetForMissingResult) } currentBatch, remaining = readUntilNextCoordinatorID(remaining) @@ -205,32 +225,71 @@ func (q *resolverQueueImpl) resolveLocked(ctx context.Context) error { return allErrors } -func (q *resolverQueueImpl) maybeRequeueBatchLocked( - batch []contentionpb.ExtendedContentionEvent, initialBudget uint32, -) { - for _, event := range batch { - q.maybeRequeueEventForRetryLocked(event, initialBudget) +func maybeUpdateTxnFingerprintID( + txnID uuid.UUID, + existingTxnFingerprintID *roachpb.TransactionFingerprintID, + resolvedTxnIDs, inProgressTxnIDs map[uuid.UUID]roachpb.TransactionFingerprintID, +) (needToRetry bool, initialRetryBudget uint32) { + // This means the txnID has already been resolved into transaction fingerprint + // ID. + if *existingTxnFingerprintID != roachpb.InvalidTransactionFingerprintID { + return false /* needToRetry */, 0 /* initialRetryBudget */ + } + + // Sometimes DistSQL engine is used in weird ways. It is possible for a + // DistSQL flow to exist without being associated with any transactions and + // can still experience contentions. When that happens, we don't attempt to + // resolve it. + if uuid.Nil.Equal(txnID) { + return false /* needToRetry */, 0 /* initialRetryBudget */ + } + + if resolvedTxnIDs == nil { + return true /* needToRetry */, retryBudgetForRPCFailure + } + + if _, ok := inProgressTxnIDs[txnID]; ok { + return true /* needToRetry */, retryBudgetForTxnInProgress + } + + if inProgressTxnIDs == nil { + return true /* needToRetry */, retryBudgetForRPCFailure + } + + if txnFingerprintID, ok := resolvedTxnIDs[txnID]; ok { + *existingTxnFingerprintID = txnFingerprintID + return false /* needToRetry */, 0 /* initialRetryBudget */ } + + return true /* needToRetry */, retryBudgetForMissingResult } func (q *resolverQueueImpl) maybeRequeueEventForRetryLocked( event contentionpb.ExtendedContentionEvent, initialBudget uint32, ) (requeued bool) { - // If we fail to resolve the result, we look up this event's remaining retry - // count. If its retry budget is exhausted, we discard it. Else, we - // re-queue the event for retry and decrement its retry budget for the - // event. - remainingRetryBudget, ok := q.mu.remainingRetries[event.BlockingEvent.TxnMeta.ID] - if !ok { - remainingRetryBudget = initialBudget + var remainingRetryBudget uint32 + var ok bool + + if initialBudget == retryBudgetForTxnInProgress { + delete(q.mu.remainingRetries, event.Hash()) } else { - remainingRetryBudget-- - } - q.mu.remainingRetries[event.BlockingEvent.TxnMeta.ID] = remainingRetryBudget + // If we fail to resolve the result, we look up this event's remaining retry + // count. If its retry budget is exhausted, we discard it. Else, we + // re-queue the event for retry and decrement its retry budget for the + // event. + remainingRetryBudget, ok = q.mu.remainingRetries[event.Hash()] + if !ok { + remainingRetryBudget = initialBudget + } else { + remainingRetryBudget-- + } - if remainingRetryBudget == 0 { - delete(q.mu.remainingRetries, event.BlockingEvent.TxnMeta.ID) - return false /* requeued */ + q.mu.remainingRetries[event.Hash()] = remainingRetryBudget + + if remainingRetryBudget == 0 { + delete(q.mu.remainingRetries, event.Hash()) + return false /* requeued */ + } } q.mu.unresolvedEvents = append(q.mu.unresolvedEvents, event) @@ -257,6 +316,10 @@ func readUntilNextCoordinatorID( func extractResolvedAndInProgressTxnIDs( resp *serverpb.TxnIDResolutionResponse, ) (resolvedTxnIDs, inProgressTxnIDs map[uuid.UUID]roachpb.TransactionFingerprintID) { + if resp == nil { + return nil /* resolvedTxnID */, nil /* inProgressTxnIDs */ + } + resolvedTxnIDs = make(map[uuid.UUID]roachpb.TransactionFingerprintID, len(resp.ResolvedTxnIDs)) inProgressTxnIDs = make(map[uuid.UUID]roachpb.TransactionFingerprintID, len(resp.ResolvedTxnIDs)) @@ -271,17 +334,30 @@ func extractResolvedAndInProgressTxnIDs( return resolvedTxnIDs, inProgressTxnIDs } -func makeRPCRequestFromBatch( +// makeRPCRequestsFromBatch creates two TxnIDResolution RPC requests from the +// batch of contentionpb.ExtendedContentionEvent. If the event already contains +// a resolved transaction fingerprint ID, then the corresponding transaction ID +// is omitted from the RPC request payload. +func makeRPCRequestsFromBatch( batch []contentionpb.ExtendedContentionEvent, -) *serverpb.TxnIDResolutionRequest { - req := &serverpb.TxnIDResolutionRequest{ +) (blockingTxnIDReq, waitingTxnIDReq *serverpb.TxnIDResolutionRequest) { + blockingTxnIDReq = &serverpb.TxnIDResolutionRequest{ CoordinatorID: strconv.Itoa(int(batch[0].BlockingEvent.TxnMeta.CoordinatorNodeID)), TxnIDs: make([]uuid.UUID, 0, len(batch)), } + waitingTxnIDReq = &serverpb.TxnIDResolutionRequest{ + CoordinatorID: "local", + TxnIDs: make([]uuid.UUID, 0, len(batch)), + } - for _, event := range batch { - req.TxnIDs = append(req.TxnIDs, event.BlockingEvent.TxnMeta.ID) + for i := range batch { + if batch[i].BlockingTxnFingerprintID == roachpb.InvalidTransactionFingerprintID { + blockingTxnIDReq.TxnIDs = append(blockingTxnIDReq.TxnIDs, batch[i].BlockingEvent.TxnMeta.ID) + } + if batch[i].WaitingTxnFingerprintID == roachpb.InvalidTransactionFingerprintID { + waitingTxnIDReq.TxnIDs = append(waitingTxnIDReq.TxnIDs, batch[i].WaitingTxnID) + } } - return req + return blockingTxnIDReq, waitingTxnIDReq } diff --git a/pkg/sql/contention/resolver_test.go b/pkg/sql/contention/resolver_test.go index 09cb1fc888e2..cbfba28997e3 100644 --- a/pkg/sql/contention/resolver_test.go +++ b/pkg/sql/contention/resolver_test.go @@ -26,7 +26,8 @@ import ( ) type testData struct { - contentionpb.ResolvedTxnID + blockingTxn contentionpb.ResolvedTxnID + waitingTxn contentionpb.ResolvedTxnID coordinatorNodeID string } @@ -38,45 +39,69 @@ func TestResolver(t *testing.T) { t.Run("normal_resolution", func(t *testing.T) { tcs := []testData{ { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 100, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 900, + }, coordinatorNodeID: "1", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 101, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 901, + }, coordinatorNodeID: "1", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 102, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 903, + }, coordinatorNodeID: "1", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 200, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 904, + }, coordinatorNodeID: "2", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 201, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 905, + }, coordinatorNodeID: "2", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 300, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 906, + }, coordinatorNodeID: "3", }, } @@ -92,40 +117,75 @@ func TestResolver(t *testing.T) { expected = sortResolvedContentionEvents(expected) actual = sortResolvedContentionEvents(actual) - require.Equal(t, expected, actual) + require.Empty(t, resolver.mu.remainingRetries) require.Empty(t, resolver.mu.unresolvedEvents) + require.Equal(t, expected, actual) }) t.Run("retry_after_encountering_provisional_value", func(t *testing.T) { // Reset the status server from previous test. statusServer.clear() - activeTxnID := uuid.FastMakeV4() + // activeTxnID := uuid.FastMakeV4() tcs := []testData{ { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 101, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 901, + }, coordinatorNodeID: "3", }, { - // This is a provisional entry in TxnID Cache, representing that - // the transaction is open and its fingerprint ID is not yet - // available. - ResolvedTxnID: contentionpb.ResolvedTxnID{ - TxnID: activeTxnID, + blockingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 902, + }, coordinatorNodeID: "1", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 102, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 903, + }, coordinatorNodeID: "1", }, + { + blockingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 201, + }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, + }, + coordinatorNodeID: "2", + }, + { + blockingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, + }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, + }, + coordinatorNodeID: "2", + }, } + inProgressContentionEventDueToInProgressBlockingTxn := tcs[1] + inProgressContentionEventDueToInProgressWaitingTxn := tcs[3] + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn := tcs[4] populateFakeStatusServerCluster(statusServer, tcs) input, expected := generateUnresolvedContentionEventsFromTestData(t, tcs, time.Time{}) @@ -137,34 +197,44 @@ func TestResolver(t *testing.T) { actual = sortResolvedContentionEvents(actual) require.Equal(t, expected, actual) - require.Equal(t, 1 /* expected */, len(resolver.mu.unresolvedEvents), - "expected resolver to retry resolution for active txns, "+ + require.Equal(t, 3 /* expected */, len(resolver.mu.unresolvedEvents), + "expected resolver to retry resolution for in-progress txns, "+ "but it did not") - require.True(t, activeTxnID.Equal(resolver.mu.unresolvedEvents[0].BlockingEvent.TxnMeta.ID)) + require.True(t, + inProgressContentionEventDueToInProgressBlockingTxn.blockingTxn.TxnID.Equal( + resolver.mu.unresolvedEvents[0].BlockingEvent.TxnMeta.ID)) require.Empty(t, resolver.mu.remainingRetries, "expected resolver not to create retry record for active txns, "+ "but it did") // Create 1 new entry to simulate another new txn that finished execution, // and update the existing entry to have a valid transaction fingerprint id. - newTxns := []testData{ + newData := []testData{ { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 2000, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 904, + }, coordinatorNodeID: "2", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ - TxnID: activeTxnID, + blockingTxn: contentionpb.ResolvedTxnID{ + TxnID: inProgressContentionEventDueToInProgressBlockingTxn.blockingTxn.TxnID, TxnFingerprintID: 1000, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: inProgressContentionEventDueToInProgressBlockingTxn.waitingTxn.TxnID, + TxnFingerprintID: inProgressContentionEventDueToInProgressBlockingTxn.waitingTxn.TxnFingerprintID, + }, coordinatorNodeID: "1", }, } - populateFakeStatusServerCluster(statusServer, newTxns) - newInput, newExpected := generateUnresolvedContentionEventsFromTestData(t, newTxns, time.Time{}) + populateFakeStatusServerCluster(statusServer, newData) + newInput, newExpected := generateUnresolvedContentionEventsFromTestData(t, newData, time.Time{}) // The txn with 'activeTxnID' is already present in the resolver. Omit it // from the input. newInput = newInput[:1] @@ -178,10 +248,64 @@ func TestResolver(t *testing.T) { actual = sortResolvedContentionEvents(actual) require.Equal(t, newExpected, actual) - // Nothing should be left inside the resolver after we are done. - require.Empty(t, resolver.mu.unresolvedEvents) + // Two other contention events due to in progress txn should continue to retry. + require.Equal(t, 2 /* expected */, len(resolver.mu.unresolvedEvents)) require.Empty(t, resolver.mu.remainingRetries) require.Empty(t, resolver.mu.resolvedEvents) + + // The inProgressContentionEventDueToInProgressBlockingAndWaitingTxn's + // blocking txn finishes executing. However, the waiting txn is still + // in-progress, this should cause the resolver to update the txn fingerprint + // ID of the blocking txn, and still continue to retry resolution. + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn.blockingTxn.TxnFingerprintID = 2001 + newData = []testData{inProgressContentionEventDueToInProgressBlockingAndWaitingTxn} + populateFakeStatusServerCluster(statusServer, newData) + + // We don't expect dequeue() to return anything, since the waiting txn is + // still executing. + actual, err = resolver.dequeue(ctx) + require.NoError(t, err) + require.Empty(t, actual) + + require.Equal(t, 2 /* expected */, len(resolver.mu.unresolvedEvents)) + require.Empty(t, resolver.mu.remainingRetries) + require.Empty(t, resolver.mu.resolvedEvents) + + // Even we are retrying without creating retry budget, we should still update + // the fields on the contention event. + require.Equal(t, + roachpb.InvalidTransactionFingerprintID, + resolver.mu.unresolvedEvents[1].WaitingTxnFingerprintID) + require.Equal(t, + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn.blockingTxn.TxnFingerprintID, + resolver.mu.unresolvedEvents[1].BlockingTxnFingerprintID, + "expected txnID (%s) to have updated blocking txn fingerprint "+ + "id %d, but it is %d", + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn.blockingTxn.TxnID, + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn.blockingTxn.TxnFingerprintID, + resolver.mu.unresolvedEvents[1].BlockingTxnFingerprintID, + ) + + // All in-progress transactions finish execution, this means all of their + // txn fingerprint IDs become available. This should allow resolver to + // completely finishes resolution. + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn.waitingTxn.TxnFingerprintID = 2002 + inProgressContentionEventDueToInProgressWaitingTxn.waitingTxn.TxnFingerprintID = 2003 + newData = []testData{ + inProgressContentionEventDueToInProgressBlockingAndWaitingTxn, + inProgressContentionEventDueToInProgressWaitingTxn, + } + populateFakeStatusServerCluster(statusServer, newData) + + _, newExpected = generateUnresolvedContentionEventsFromTestData(t, newData, time.Time{}) + actual, err = resolver.dequeue(ctx) + + require.NoError(t, err) + require.Equal(t, sortResolvedContentionEvents(newExpected), sortResolvedContentionEvents(actual)) + + require.Empty(t, resolver.mu.resolvedEvents) + require.Empty(t, resolver.mu.remainingRetries) + require.Empty(t, resolver.mu.unresolvedEvents) }) t.Run("retry_after_missing_value", func(t *testing.T) { @@ -189,24 +313,36 @@ func TestResolver(t *testing.T) { missingTxnID := uuid.FastMakeV4() tcs := []testData{ { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 101, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 901, + }, coordinatorNodeID: "3", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 102, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 902, + }, coordinatorNodeID: "1", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: missingTxnID, TxnFingerprintID: 1000, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 903, + }, coordinatorNodeID: "1", }, } @@ -214,6 +350,7 @@ func TestResolver(t *testing.T) { // evicted txnID from txnIDCache. populateFakeStatusServerCluster(statusServer, tcs[:2]) input, expected := generateUnresolvedContentionEventsFromTestData(t, tcs, time.Time{}) + missingTxnMissingRetryKey := input[2].Hash() resolver.enqueue(input) actual, err := resolver.dequeue(ctx) @@ -224,7 +361,7 @@ func TestResolver(t *testing.T) { require.Equal(t, sortResolvedContentionEvents(expected[:2]), sortResolvedContentionEvents(actual)) require.Equal(t, 1, len(resolver.mu.remainingRetries)) - remainingRetryBudget, foundRetryRecord := resolver.mu.remainingRetries[missingTxnID] + remainingRetryBudget, foundRetryRecord := resolver.mu.remainingRetries[missingTxnMissingRetryKey] require.True(t, foundRetryRecord) require.Equal(t, retryBudgetForMissingResult, remainingRetryBudget) @@ -251,31 +388,47 @@ func TestResolver(t *testing.T) { statusServer.clear() tcs := []testData{ { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 201, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 901, + }, coordinatorNodeID: "2", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: missingTxnID1, TxnFingerprintID: 301, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 902, + }, coordinatorNodeID: "3", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 100, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 903, + }, coordinatorNodeID: "1", }, { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: uuid.FastMakeV4(), TxnFingerprintID: 101, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 904, + }, coordinatorNodeID: "1", }, } @@ -287,6 +440,8 @@ func TestResolver(t *testing.T) { "3" /* coordinatorNode */, injectedErr) input, expected := generateUnresolvedContentionEventsFromTestData(t, tcs, time.Time{}) + missingTxnRemainingRetryKey1 := input[1].Hash() + expected = sortResolvedContentionEvents(expected) expectedWithOnlyResultsFromAvailableNodes := make([]contentionpb.ExtendedContentionEvent, 0, len(expected)) for _, event := range expected { @@ -311,7 +466,7 @@ func TestResolver(t *testing.T) { require.Equal(t, 1, len(resolver.mu.remainingRetries), "expected to have a retry record after RPC failure, but the "+ "retry record is missing") - remainingRetryBudget, found := resolver.mu.remainingRetries[missingTxnID1] + remainingRetryBudget, found := resolver.mu.remainingRetries[missingTxnRemainingRetryKey1] require.True(t, found) require.Equal(t, retryBudgetForRPCFailure, remainingRetryBudget) @@ -320,10 +475,14 @@ func TestResolver(t *testing.T) { missingTxnID2 := uuid.FastMakeV4() tcs = []testData{ { - ResolvedTxnID: contentionpb.ResolvedTxnID{ + blockingTxn: contentionpb.ResolvedTxnID{ TxnID: missingTxnID2, TxnFingerprintID: 202, }, + waitingTxn: contentionpb.ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: 905, + }, coordinatorNodeID: "2", }, } @@ -332,6 +491,7 @@ func TestResolver(t *testing.T) { statusServer.setStatusServerError( "2" /* coordinatorNodeID */, injectedErr) input2, expected2 := generateUnresolvedContentionEventsFromTestData(t, tcs, time.Time{}) + missingTxnRemainingRetryKey2 := input2[0].Hash() resolver.enqueue(input2) require.Equal(t, 2, len(resolver.mu.unresolvedEvents)) @@ -339,13 +499,13 @@ func TestResolver(t *testing.T) { require.ErrorIs(t, resolver.resolveLocked(ctx), injectedErr) require.Equal(t, 2, len(resolver.mu.remainingRetries)) - remainingRetryBudget, found = resolver.mu.remainingRetries[missingTxnID1] + remainingRetryBudget, found = resolver.mu.remainingRetries[missingTxnRemainingRetryKey1] require.True(t, found) require.Equal(t, retryBudgetForRPCFailure-1, remainingRetryBudget, "expect retry budget be decremented after consecutive failures, "+ "but it was not") - remainingRetryBudget, found = resolver.mu.remainingRetries[missingTxnID2] + remainingRetryBudget, found = resolver.mu.remainingRetries[missingTxnRemainingRetryKey2] require.True(t, found) require.Equal(t, retryBudgetForRPCFailure, remainingRetryBudget) @@ -373,7 +533,7 @@ func sortResolvedContentionEvents( events []contentionpb.ExtendedContentionEvent, ) []contentionpb.ExtendedContentionEvent { sort.Slice(events, func(i, j int) bool { - return events[i].BlockingTxnFingerprintID < events[j].BlockingTxnFingerprintID + return events[i].Hash() < events[j].Hash() }) return events } @@ -385,19 +545,27 @@ func generateUnresolvedContentionEventsFromTestData( expected []contentionpb.ExtendedContentionEvent, ) { for _, tc := range tcs { - event := contentionpb.ExtendedContentionEvent{} - event.BlockingEvent.TxnMeta.ID = tc.TxnID + inputEvent := contentionpb.ExtendedContentionEvent{} + inputEvent.BlockingEvent.TxnMeta.ID = tc.blockingTxn.TxnID coordinatorID, err := strconv.Atoi(tc.coordinatorNodeID) require.NoError(t, err) - event.BlockingEvent.TxnMeta.CoordinatorNodeID = int32(coordinatorID) - input = append(input, event) - - if tc.TxnFingerprintID != roachpb.InvalidTransactionFingerprintID { - resolvedEvent := contentionpb.ExtendedContentionEvent{} - resolvedEvent.BlockingEvent = event.BlockingEvent - resolvedEvent.BlockingTxnFingerprintID = tc.TxnFingerprintID - resolvedEvent.CollectionTs = collectionTs - expected = append(expected, resolvedEvent) + inputEvent.BlockingEvent.TxnMeta.CoordinatorNodeID = int32(coordinatorID) + inputEvent.WaitingTxnID = tc.waitingTxn.TxnID + input = append(input, inputEvent) + + if tc.blockingTxn.TxnFingerprintID != roachpb.InvalidTransactionFingerprintID && + tc.waitingTxn.TxnFingerprintID != roachpb.InvalidTransactionFingerprintID { + expectedResolvedEvent := contentionpb.ExtendedContentionEvent{} + + expectedResolvedEvent.BlockingEvent = inputEvent.BlockingEvent + expectedResolvedEvent.BlockingTxnFingerprintID = tc.blockingTxn.TxnFingerprintID + + expectedResolvedEvent.CollectionTs = collectionTs + + expectedResolvedEvent.WaitingTxnID = inputEvent.WaitingTxnID + expectedResolvedEvent.WaitingTxnFingerprintID = tc.waitingTxn.TxnFingerprintID + + expected = append(expected, expectedResolvedEvent) } } return input, expected @@ -405,6 +573,7 @@ func generateUnresolvedContentionEventsFromTestData( func populateFakeStatusServerCluster(f fakeStatusServerCluster, tcs []testData) { for _, tc := range tcs { - f.setTxnIDEntry(tc.coordinatorNodeID, tc.TxnID, tc.TxnFingerprintID) + f.setTxnIDEntry(tc.coordinatorNodeID, tc.blockingTxn.TxnID, tc.blockingTxn.TxnFingerprintID) + f.setTxnIDEntry("local", tc.waitingTxn.TxnID, tc.waitingTxn.TxnFingerprintID) } } diff --git a/pkg/sql/contentionpb/BUILD.bazel b/pkg/sql/contentionpb/BUILD.bazel index 3a507d485fca..3383e0c5d0dc 100644 --- a/pkg/sql/contentionpb/BUILD.bazel +++ b/pkg/sql/contentionpb/BUILD.bazel @@ -1,6 +1,6 @@ load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "contentionpb", @@ -8,7 +8,11 @@ go_library( embed = [":contentionpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/contentionpb", visibility = ["//visibility:public"], - deps = ["//pkg/util/uuid"], + deps = [ + "//pkg/util", + "//pkg/util/encoding", + "//pkg/util/uuid", + ], ) proto_library( @@ -37,3 +41,15 @@ go_proto_library( "@com_github_gogo_protobuf//gogoproto", ], ) + +go_test( + name = "contentionpb_test", + srcs = ["contention_test.go"], + embed = [":contentionpb"], + deps = [ + "//pkg/util", + "//pkg/util/timeutil", + "//pkg/util/uuid", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/contentionpb/contention.go b/pkg/sql/contentionpb/contention.go index 753982e8c148..a927811a7f8a 100644 --- a/pkg/sql/contentionpb/contention.go +++ b/pkg/sql/contentionpb/contention.go @@ -14,6 +14,8 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -74,3 +76,32 @@ func (r *ResolvedTxnID) Valid() bool { func (e *ExtendedContentionEvent) Valid() bool { return !uuid.Nil.Equal(e.BlockingEvent.TxnMeta.ID) } + +// Hash returns a hash that's unique to ExtendedContentionEvent using +// blocking txn's txnID, waiting txn's txnID and the event collection timestamp. +func (e *ExtendedContentionEvent) Hash() uint64 { + hash := util.MakeFNV64() + hashUUID(e.BlockingEvent.TxnMeta.ID, &hash) + hashUUID(e.WaitingTxnID, &hash) + hash.Add(uint64(e.CollectionTs.UnixMilli())) + return hash.Sum() +} + +// hashUUID adds the hash of the uuid into the fnv. +// An uuid is a 16 byte array. To hash UUID, we treat it as two uint64 integers, +// since uint64 is 8-byte. This is why we decode the byte array twice and add +// the resulting uint64 into the fnv each time. +func hashUUID(u uuid.UUID, fnv *util.FNV64) { + b := u.GetBytes() + + b, val, err := encoding.DecodeUint64Descending(b) + if err != nil { + panic(err) + } + fnv.Add(val) + _, val, err = encoding.DecodeUint64Descending(b) + if err != nil { + panic(err) + } + fnv.Add(val) +} diff --git a/pkg/sql/contentionpb/contention_test.go b/pkg/sql/contentionpb/contention_test.go new file mode 100644 index 000000000000..765fd6dbe027 --- /dev/null +++ b/pkg/sql/contentionpb/contention_test.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package contentionpb + +import ( + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestExtendedContentionEventHash(t *testing.T) { + event1 := ExtendedContentionEvent{} + event1.BlockingEvent.TxnMeta.ID = uuid.FastMakeV4() + event1.WaitingTxnID = uuid.FastMakeV4() + event1.CollectionTs = timeutil.Now() + + eventWithDifferentBlockingTxnID := event1 + eventWithDifferentBlockingTxnID.BlockingEvent.TxnMeta.ID = uuid.FastMakeV4() + + require.NotEqual(t, eventWithDifferentBlockingTxnID.Hash(), event1.Hash()) + + eventWithDifferentWaitingTxnID := event1 + eventWithDifferentWaitingTxnID.WaitingTxnID = uuid.FastMakeV4() + require.NotEqual(t, eventWithDifferentWaitingTxnID.Hash(), event1.Hash()) + + eventWithDifferentCollectionTs := event1 + eventWithDifferentCollectionTs.CollectionTs = event1.CollectionTs.Add(time.Second) + require.NotEqual(t, eventWithDifferentCollectionTs.Hash(), event1.Hash()) +} + +func TestHashingUUID(t *testing.T) { + // Ensure that if two UUIDs are only different in the first or last 8 bytes, + // they still produces different hash. + uuid1 := uuid.UUID{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + } + fnv1 := util.MakeFNV64() + hashUUID(uuid1, &fnv1) + + uuid2 := uuid.UUID{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, + } + fnv2 := util.MakeFNV64() + hashUUID(uuid2, &fnv2) + + uuid3 := uuid.UUID{ + 0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + } + fnv3 := util.MakeFNV64() + hashUUID(uuid3, &fnv3) + + require.NotEqual(t, fnv1.Sum(), fnv2.Sum()) + require.NotEqual(t, fnv1.Sum(), fnv3.Sum()) + require.NotEqual(t, fnv2.Sum(), fnv3.Sum()) +}