diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go index abf7ed3e1a64..c4a201fda2c7 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go @@ -187,13 +187,10 @@ func (h *txnHeartbeater) SendLocked( ba.Txn.Key = anchor } - // Start the heartbeat loop if it has not already started and this batch - // is not intending to commit/abort the transaction. + // Start the heartbeat loop if it has not already started. if !h.mu.loopStarted { - if !hasET { - if err := h.startHeartbeatLoopLocked(ctx); err != nil { - return nil, roachpb.NewError(err) - } + if err := h.startHeartbeatLoopLocked(ctx); err != nil { + return nil, roachpb.NewError(err) } } } @@ -204,6 +201,10 @@ func (h *txnHeartbeater) SendLocked( // Set the EndTxn request's TxnHeartbeating flag. Set to true if // a hearbeat loop was started which indicates that transaction has // a transaction record. + // + // TODO(erikgrinaker): In v21.2 we always heartbeat the txn record, so + // this field is never used. However, we still need to set it when + // interacting with v21.1 nodes. We can remove this field in v22.1. et.TxnHeartbeating = h.mu.loopStarted // Preemptively stop the heartbeat loop in case of transaction abort. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go index eae8e9748804..e3b0c54d92ab 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go @@ -216,9 +216,9 @@ func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) { }) } -// TestTxnHeartbeaterLoopNotStartedFor1PC tests that the txnHeartbeater does -// not start a heartbeat loop if it detects a 1PC transaction. -func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) { +// TestTxnHeartbeaterLoopStartedFor1PC tests that the txnHeartbeater +// starts a heartbeat loop if it detects a 1PC transaction. +func TestTxnHeartbeaterLoopStartedFor1PC(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() @@ -238,7 +238,7 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) { require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner()) etReq := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest) - require.False(t, etReq.TxnHeartbeating) + require.True(t, etReq.TxnHeartbeating) br := ba.CreateReply() br.Txn = ba.Txn @@ -250,8 +250,9 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) { require.NotNil(t, br) th.mu.Lock() - require.False(t, th.mu.loopStarted) - require.False(t, th.heartbeatLoopRunningLocked()) + require.True(t, th.mu.loopStarted) + require.True(t, th.heartbeatLoopRunningLocked()) + th.closeLocked() th.mu.Unlock() } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 87c6dfa87164..700be8ccca8f 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11266,7 +11266,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) et.InFlightWrites = inFlightWrites - et.TxnHeartbeating = true return sendWrappedWithErr(etH, &et) }, expTxn: txnWithStagingStatusAndInFlightWrites, @@ -11279,7 +11278,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { }, run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, false /* commit */) - et.TxnHeartbeating = true return sendWrappedWithErr(etH, &et) }, // The transaction record will be eagerly GC-ed. @@ -11293,7 +11291,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { }, run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) - et.TxnHeartbeating = true return sendWrappedWithErr(etH, &et) }, // The transaction record will be eagerly GC-ed. @@ -11307,7 +11304,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { }, run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, false /* commit */) - et.TxnHeartbeating = true return sendWrappedWithErr(etH, &et) }, expTxn: txnWithStatus(roachpb.ABORTED), @@ -11321,7 +11317,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { }, run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) - et.TxnHeartbeating = true return sendWrappedWithErr(etH, &et) }, expTxn: txnWithStatus(roachpb.COMMITTED), @@ -12107,7 +12102,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) et.Sequence = 1 // qualify for 1PC - et.TxnHeartbeating = true return sendWrappedWithErr(etH, &et) }, expTxn: noTxnRecord, @@ -12124,7 +12118,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) et.Sequence = 1 // qualify for 1PC - et.TxnHeartbeating = true et.Require1PC = true return sendWrappedWithErr(etH, &et) }, diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index ba772fda70b3..e55455d9a532 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -406,22 +406,8 @@ func (r *Replica) canAttempt1PCEvaluation( // Check whether the txn record has already been created. If so, we can't // perform a 1PC evaluation because we need to clean up the record during // evaluation. - // - // We only perform this check if the transaction's EndTxn indicates that it - // has started its heartbeat loop. If not, the transaction cannot have an - // existing record. However, we perform it unconditionally under race to - // catch bugs. - arg, _ := ba.GetArg(roachpb.EndTxn) - etArg := arg.(*roachpb.EndTxnRequest) - if etArg.TxnHeartbeating || util.RaceEnabled { - if ok, err := batcheval.HasTxnRecord(ctx, r.store.Engine(), ba.Txn); err != nil { - return false, roachpb.NewError(err) - } else if ok { - if !etArg.TxnHeartbeating { - log.Fatalf(ctx, "non-heartbeating txn with txn record before EndTxn: %v", ba.Txn) - } - return false, nil - } + if ok, err := batcheval.HasTxnRecord(ctx, r.store.Engine(), ba.Txn); ok || err != nil { + return false, roachpb.NewError(err) } // The EndTxn checks whether the txn record can be created, but we're @@ -615,11 +601,6 @@ func (r *Replica) evaluate1PC( // have acquired unreplicated locks, so inform the concurrency manager that // it is finalized and than any unreplicated locks that it has acquired can // be released. - // - // TODO(nvanbenschoten): once we can rely on EndTxn.TxnHeartbeating being - // correct in v21.1, we can gate these notifications on TxnHeartbeating - // because we know that a transaction hasn't acquired any unreplicated - // locks if it hasn't started heartbeating. res.Local.UpdatedTxns = []*roachpb.Transaction{clonedTxn} res.Local.ResolvedLocks = make([]roachpb.LockUpdate, len(etArg.LockSpans)) for i, sp := range etArg.LockSpans { diff --git a/pkg/roachpb/api.pb.go b/pkg/roachpb/api.pb.go index 5a0160e269fd..6815a402cfc0 100644 --- a/pkg/roachpb/api.pb.go +++ b/pkg/roachpb/api.pb.go @@ -1745,6 +1745,10 @@ type EndTxnRequest struct { // // WARNING: this flag was introduced in the v20.2 release. v20.1 nodes may not // set this properly so it should not be relied upon for correctness. + // + // TODO(erikgrinaker): This flag is no longer used in v21.2 as we now always + // heartbeat the txn record, but we still need to set it when interacting with + // v21.1 nodes. It should be removed in v22.1. TxnHeartbeating bool `protobuf:"varint,10,opt,name=txn_heartbeating,json=txnHeartbeating,proto3" json:"txn_heartbeating,omitempty"` } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 5ebe6346c4d5..f51af1635957 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -669,6 +669,10 @@ message EndTxnRequest { // // WARNING: this flag was introduced in the v20.2 release. v20.1 nodes may not // set this properly so it should not be relied upon for correctness. + // + // TODO(erikgrinaker): This flag is no longer used in v21.2 as we now always + // heartbeat the txn record, but we still need to set it when interacting with + // v21.1 nodes. It should be removed in v22.1. bool txn_heartbeating = 10; reserved 7, 8; }