Skip to content

Commit

Permalink
kvcoord: always start TxnCoordSender heartbeat loop
Browse files Browse the repository at this point in the history
Previously, potential 1PC transactions (containing an `EndTxn` in the
same batch as writes) did not start the `TxnCoordSender` heartbeat loop,
and thus never wrote a transaction record. This was a performance
optimization to avoid checking for an existing record during write
evaluation. However, it could cause problems under heavy contention
where such transactions would abort each other as they did not heartbeat
their transaction record and thus would become invalid once the txn
timeout expired. This would apply even to transactions that turned out
to not use 1PC due to spanning multiple ranges, and thus are more
vulnerable to contention.

Running some benchmarks showed that this optimization in fact did not
have a detectable effect on performance (new always heartbeats):

```
name                          old ops/sec  new ops/sec  delta
kv0/enc=false/nodes=1/cpu=32   24.5k ± 8%   24.7k ± 3%   ~     (p=0.796 n=9+9)
kv0/enc=false/nodes=3/cpu=32   36.7k ± 4%   36.5k ± 4%   ~     (p=0.529 n=10+10)
kv95/enc=false/nodes=3/cpu=32  124k ± 7%    131k ± 2%  +5.56%  (p=0.002 n=10+8)
ycsb/A/nodes=3/cpu=32          27.7k ± 4%   28.1k ± 4%   ~     (p=0.315 n=9+10)

name                          old p50      new p50      delta
kv0/enc=false/nodes=1/cpu=32    2.50 ± 0%    2.50 ± 0%   ~     (all equal)
kv0/enc=false/nodes=3/cpu=32    4.70 ± 0%    4.70 ± 0%   ~     (all equal)
kv95/enc=false/nodes=3/cpu=32   1.00 ± 0%    1.00 ± 0%   ~     (all equal)
ycsb/A/nodes=3/cpu=32           2.44 ± 2%    2.45 ± 2%   ~     (p=1.000 n=10+10)

name                          old p95      new p95      delta
kv0/enc=false/nodes=1/cpu=32    4.79 ± 9%    4.70 ± 0%   ~     (p=0.350 n=9+7)
kv0/enc=false/nodes=3/cpu=32    10.7 ± 3%    10.8 ± 7%   ~     (p=1.000 n=10+10)
kv95/enc=false/nodes=3/cpu=32   5.02 ±10%    4.70 ± 0%  -6.37%  (p=0.003 n=10+8)
ycsb/A/nodes=3/cpu=32           9.10 ± 8%    9.20 ± 3%   ~     (p=0.720 n=10+10)

name                          old p99      new p99      delta
kv0/enc=false/nodes=1/cpu=32    6.90 ±10%    6.87 ± 6%   ~     (p=1.000 n=9+9)
kv0/enc=false/nodes=3/cpu=32    14.4 ± 9%    14.4 ± 6%   ~     (p=0.707 n=10+9)
kv95/enc=false/nodes=3/cpu=32   9.79 ± 7%    9.21 ± 3% -5.90%  (p=0.015 n=10+8)
ycsb/A/nodes=3/cpu=32           78.0 ± 9%    75.5 ± 6%   ~     (p=0.305 n=10+10)
```

This patch therefore always starts the heartbeat loop for all
transactions. The txn record will only be written after about 1 second,
and only if the txn is still alive at that time, so in the typical 1PC
case it will never be written anyway.

The `EndTxn.TxnHeartbeating` field is retained for compatibility with
v21.1 nodes, otherwise these nodes would never check for txn records
when evaluating possible 1PC writes.

Release note: None
  • Loading branch information
erikgrinaker committed Jul 9, 2021
1 parent 960d7d3 commit f7ded0b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 40 deletions.
13 changes: 7 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,10 @@ func (h *txnHeartbeater) SendLocked(
ba.Txn.Key = anchor
}

// Start the heartbeat loop if it has not already started and this batch
// is not intending to commit/abort the transaction.
// Start the heartbeat loop if it has not already started.
if !h.mu.loopStarted {
if !hasET {
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
return nil, roachpb.NewError(err)
}
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
return nil, roachpb.NewError(err)
}
}
}
Expand All @@ -204,6 +201,10 @@ func (h *txnHeartbeater) SendLocked(
// Set the EndTxn request's TxnHeartbeating flag. Set to true if
// a hearbeat loop was started which indicates that transaction has
// a transaction record.
//
// TODO(erikgrinaker): In v21.2 we always heartbeat the txn record, so
// this field is never used. However, we still need to set it when
// interacting with v21.1 nodes. We can remove this field in v22.1.
et.TxnHeartbeating = h.mu.loopStarted

// Preemptively stop the heartbeat loop in case of transaction abort.
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) {
})
}

// TestTxnHeartbeaterLoopNotStartedFor1PC tests that the txnHeartbeater does
// not start a heartbeat loop if it detects a 1PC transaction.
func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
// TestTxnHeartbeaterLoopStartedFor1PC tests that the txnHeartbeater
// starts a heartbeat loop if it detects a 1PC transaction.
func TestTxnHeartbeaterLoopStartedFor1PC(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
Expand All @@ -238,7 +238,7 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner())

etReq := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.False(t, etReq.TxnHeartbeating)
require.True(t, etReq.TxnHeartbeating)

br := ba.CreateReply()
br.Txn = ba.Txn
Expand All @@ -250,8 +250,9 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
require.NotNil(t, br)

th.mu.Lock()
require.False(t, th.mu.loopStarted)
require.False(t, th.heartbeatLoopRunningLocked())
require.True(t, th.mu.loopStarted)
require.True(t, th.heartbeatLoopRunningLocked())
th.closeLocked()
th.mu.Unlock()
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11266,7 +11266,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.InFlightWrites = inFlightWrites
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStagingStatusAndInFlightWrites,
Expand All @@ -11279,7 +11278,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, false /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
// The transaction record will be eagerly GC-ed.
Expand All @@ -11293,7 +11291,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
// The transaction record will be eagerly GC-ed.
Expand All @@ -11307,7 +11304,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, false /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStatus(roachpb.ABORTED),
Expand All @@ -11321,7 +11317,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStatus(roachpb.COMMITTED),
Expand Down Expand Up @@ -12107,7 +12102,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.Sequence = 1 // qualify for 1PC
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: noTxnRecord,
Expand All @@ -12124,7 +12118,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.Sequence = 1 // qualify for 1PC
et.TxnHeartbeating = true
et.Require1PC = true
return sendWrappedWithErr(etH, &et)
},
Expand Down
23 changes: 2 additions & 21 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,22 +406,8 @@ func (r *Replica) canAttempt1PCEvaluation(
// Check whether the txn record has already been created. If so, we can't
// perform a 1PC evaluation because we need to clean up the record during
// evaluation.
//
// We only perform this check if the transaction's EndTxn indicates that it
// has started its heartbeat loop. If not, the transaction cannot have an
// existing record. However, we perform it unconditionally under race to
// catch bugs.
arg, _ := ba.GetArg(roachpb.EndTxn)
etArg := arg.(*roachpb.EndTxnRequest)
if etArg.TxnHeartbeating || util.RaceEnabled {
if ok, err := batcheval.HasTxnRecord(ctx, r.store.Engine(), ba.Txn); err != nil {
return false, roachpb.NewError(err)
} else if ok {
if !etArg.TxnHeartbeating {
log.Fatalf(ctx, "non-heartbeating txn with txn record before EndTxn: %v", ba.Txn)
}
return false, nil
}
if ok, err := batcheval.HasTxnRecord(ctx, r.store.Engine(), ba.Txn); ok || err != nil {
return false, roachpb.NewError(err)
}

// The EndTxn checks whether the txn record can be created, but we're
Expand Down Expand Up @@ -615,11 +601,6 @@ func (r *Replica) evaluate1PC(
// have acquired unreplicated locks, so inform the concurrency manager that
// it is finalized and than any unreplicated locks that it has acquired can
// be released.
//
// TODO(nvanbenschoten): once we can rely on EndTxn.TxnHeartbeating being
// correct in v21.1, we can gate these notifications on TxnHeartbeating
// because we know that a transaction hasn't acquired any unreplicated
// locks if it hasn't started heartbeating.
res.Local.UpdatedTxns = []*roachpb.Transaction{clonedTxn}
res.Local.ResolvedLocks = make([]roachpb.LockUpdate, len(etArg.LockSpans))
for i, sp := range etArg.LockSpans {
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,10 @@ message EndTxnRequest {
//
// WARNING: this flag was introduced in the v20.2 release. v20.1 nodes may not
// set this properly so it should not be relied upon for correctness.
//
// TODO(erikgrinaker): This flag is no longer used in v21.2 as we now always
// heartbeat the txn record, but we still need to set it when interacting with
// v21.1 nodes. It should be removed in v22.1.
bool txn_heartbeating = 10;
reserved 7, 8;
}
Expand Down

0 comments on commit f7ded0b

Please sign in to comment.