Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: always start TxnCoordSender heartbeat loop #67215

Merged
merged 2 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,9 @@ func (h *txnHeartbeater) SendLocked(
ba.Txn.Key = anchor
}

// Start the heartbeat loop if it has not already started and this batch
// is not intending to commit/abort the transaction.
// Start the heartbeat loop if it has not already started.
if !h.mu.loopStarted {
if !hasET {
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
return nil, roachpb.NewError(err)
}
}
h.startHeartbeatLoopLocked(ctx)
}
}

Expand All @@ -204,6 +199,10 @@ func (h *txnHeartbeater) SendLocked(
// Set the EndTxn request's TxnHeartbeating flag. Set to true if
// a hearbeat loop was started which indicates that transaction has
// a transaction record.
//
// TODO(erikgrinaker): In v21.2 we always heartbeat the txn record, so
// this field is never used. However, we still need to set it when
// interacting with v21.1 nodes. We can remove this field in v22.1.
et.TxnHeartbeating = h.mu.loopStarted

// Preemptively stop the heartbeat loop in case of transaction abort.
Expand Down Expand Up @@ -282,7 +281,7 @@ func (h *txnHeartbeater) closeLocked() {
}

// startHeartbeatLoopLocked starts a heartbeat loop in a different goroutine.
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) {
if h.mu.loopStarted {
log.Fatal(ctx, "attempting to start a second heartbeat loop")
}
Expand All @@ -292,16 +291,31 @@ func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
// (it's zero).
h.AmbientContext.AddLogTag("txn-hb", h.mu.txn.Short())

const taskName = "[async] kv.TxnCoordSender: heartbeat loop"

// Create a new context so that the heartbeat loop doesn't inherit the
// caller's cancelation.
// We want the loop to run in a span linked to the current one, though, so we
// put our span in the new context and expect RunAsyncTask to fork it
// immediately.
hbCtx := h.AnnotateCtx(context.Background())
hbCtx = tracing.ContextWithSpan(hbCtx, tracing.SpanFromContext(ctx))
hbCtx, h.mu.loopCancel = context.WithCancel(hbCtx)

return h.stopper.RunAsyncTask(hbCtx, "kv.TxnCoordSender: heartbeat loop", h.heartbeatLoop)
hbCtx, hbCancel := context.WithCancel(h.AnnotateCtx(context.Background()))

// Delay spawning the loop goroutine until the first loopInterval passes, to
// avoid the associated cost for small write transactions. In benchmarks,
// this gave a 3% throughput increase for point writes at high concurrency.
timer := time.AfterFunc(h.loopInterval, func() {
// We want the loop to run in a span linked to the current one, so we put
// our span in the context and fork it.
var span *tracing.Span
hbCtx = tracing.ContextWithSpan(hbCtx, tracing.SpanFromContext(ctx))
hbCtx, span = tracing.ForkSpan(hbCtx, taskName)
defer span.Finish()

// Only errors on quiesce, which is safe to ignore.
_ = h.stopper.RunTask(hbCtx, taskName, h.heartbeatLoop)
})

h.mu.loopCancel = func() {
timer.Stop()
hbCancel()
}
}

func (h *txnHeartbeater) cancelHeartbeatLoopLocked() {
Expand Down Expand Up @@ -333,6 +347,11 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
defer ticker.Stop()
}

// Loop is only spawned after loopInterval, so heartbeat immediately.
if !h.heartbeat(ctx) {
return
}

// Loop with ticker for periodic heartbeats.
for {
select {
Expand Down Expand Up @@ -448,7 +467,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
return true
}

// abortTxnAsyncLocked sends an EndTxn(commmit=false) asynchronously.
// abortTxnAsyncLocked sends an EndTxn(commit=false) asynchronously.
// The purpose of the async cleanup is to resolve transaction intents as soon
// as possible when a transaction coordinator observes an ABORTED transaction.
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) {
})
}

// TestTxnHeartbeaterLoopNotStartedFor1PC tests that the txnHeartbeater does
// not start a heartbeat loop if it detects a 1PC transaction.
func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
// TestTxnHeartbeaterLoopStartedFor1PC tests that the txnHeartbeater
// starts a heartbeat loop if it detects a 1PC transaction.
func TestTxnHeartbeaterLoopStartedFor1PC(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
Expand All @@ -238,7 +238,7 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner())

etReq := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.False(t, etReq.TxnHeartbeating)
require.True(t, etReq.TxnHeartbeating)

br := ba.CreateReply()
br.Txn = ba.Txn
Expand All @@ -250,8 +250,9 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
require.NotNil(t, br)

th.mu.Lock()
require.False(t, th.mu.loopStarted)
require.False(t, th.heartbeatLoopRunningLocked())
require.True(t, th.mu.loopStarted)
require.True(t, th.heartbeatLoopRunningLocked())
th.closeLocked()
th.mu.Unlock()
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -195,17 +193,3 @@ func SynthesizeTxnFromMeta(
}
return synth.AsTransaction()
}

// HasTxnRecord returns whether the provided transaction has a transaction
// record. The provided reader must come from the leaseholder of the transaction
// record's Range.
func HasTxnRecord(
ctx context.Context, reader storage.Reader, txn *roachpb.Transaction,
) (bool, error) {
key := keys.TransactionKey(txn.Key, txn.ID)
val, _, err := storage.MVCCGet(ctx, reader, key, hlc.Timestamp{}, storage.MVCCGetOptions{})
if err != nil {
return false, err
}
return val != nil, nil
}
27 changes: 10 additions & 17 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4728,7 +4728,7 @@ func TestRPCRetryProtectionInTxn(t *testing.T) {
t.Fatalf("expected error, got nil")
}
require.Regexp(t,
`TransactionAbortedError\(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY\)`,
`TransactionAbortedError\(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY\)`,
pErr)
})
}
Expand Down Expand Up @@ -4910,7 +4910,7 @@ func TestBatchRetryCantCommitIntents(t *testing.T) {

// Heartbeat should fail with a TransactionAbortedError.
_, pErr = tc.SendWrappedWith(hbH, &hb)
expErr := "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)"
expErr := "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)"
if !testutils.IsPError(pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, pErr)
}
Expand Down Expand Up @@ -11308,7 +11308,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.InFlightWrites = inFlightWrites
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStagingStatusAndInFlightWrites,
Expand All @@ -11321,7 +11320,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, false /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
// The transaction record will be eagerly GC-ed.
Expand All @@ -11335,7 +11333,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
// The transaction record will be eagerly GC-ed.
Expand All @@ -11349,7 +11346,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, false /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStatus(roachpb.ABORTED),
Expand All @@ -11363,7 +11359,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStatus(roachpb.COMMITTED),
Expand Down Expand Up @@ -11686,7 +11681,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
hb, hbH := heartbeatArgs(txn, now)
return sendWrappedWithErr(hbH, &hb)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand All @@ -11705,7 +11700,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
hb, hbH := heartbeatArgs(clone, now)
return sendWrappedWithErr(hbH, &hb)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand All @@ -11720,7 +11715,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
et.InFlightWrites = inFlightWrites
return sendWrappedWithErr(etH, &et)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand Down Expand Up @@ -11748,7 +11743,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
et, etH := endTxnArgs(txn, true /* commit */)
return sendWrappedWithErr(etH, &et)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand Down Expand Up @@ -11786,7 +11781,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
hb, hbH := heartbeatArgs(txn, now)
return sendWrappedWithErr(hbH, &hb)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand All @@ -11801,7 +11796,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
et.InFlightWrites = inFlightWrites
return sendWrappedWithErr(etH, &et)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand Down Expand Up @@ -11829,7 +11824,7 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
et, etH := endTxnArgs(txn, true /* commit */)
return sendWrappedWithErr(etH, &et)
},
expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)",
expError: "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)",
expTxn: noTxnRecord,
},
{
Expand Down Expand Up @@ -12149,7 +12144,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.Sequence = 1 // qualify for 1PC
et.TxnHeartbeating = true
return sendWrappedWithErr(etH, &et)
},
expTxn: noTxnRecord,
Expand All @@ -12166,7 +12160,6 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.Sequence = 1 // qualify for 1PC
et.TxnHeartbeating = true
et.Require1PC = true
return sendWrappedWithErr(etH, &et)
},
Expand Down Expand Up @@ -12653,7 +12646,7 @@ func TestRollbackMissingTxnRecordNoError(t *testing.T) {
// a retryable TransactionAbortedError, but if there's actually a sort of
// replay at work and a client is still waiting for the error, the error would
// be transformed into something more ambiguous on the way.
expErr := "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)"
expErr := "TransactionAbortedError(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY)"
if !testutils.IsPError(pErr, regexp.QuoteMeta(expErr)) {
t.Errorf("expected %s; got %v", expErr, pErr)
}
Expand Down
Loading