Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: commit-wait before running commit triggers and resolving intents #63971

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ func (tc *TxnCoordSender) finalizeNonLockingTxnLocked(
}
tc.finalizeAndCleanupTxnLocked(ctx)
if et.Commit {
tc.maybeCommitWait(ctx, false /* deferred */)
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return roachpb.NewError(err)
}
}
return nil
}
Expand Down Expand Up @@ -519,7 +521,9 @@ func (tc *TxnCoordSender) Send(
if (et.Commit && pErr == nil) || !et.Commit {
tc.finalizeAndCleanupTxnLocked(ctx)
if et.Commit {
tc.maybeCommitWait(ctx, false /* deferred */)
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return nil, roachpb.NewError(err)
}
}
}
}
Expand Down Expand Up @@ -587,15 +591,15 @@ func (tc *TxnCoordSender) Send(
//
// For more, see https://www.cockroachlabs.com/blog/consistency-model/ and
// docs/RFCS/20200811_non_blocking_txns.md.
func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) {
func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) error {
if tc.mu.txn.Status != roachpb.COMMITTED {
log.Fatalf(ctx, "maybeCommitWait called when not committed")
}
if tc.mu.commitWaitDeferred && !deferred {
// If this is an automatic commit-wait call and the user of this
// transaction has opted to defer the commit-wait and handle it
// externally, there's nothing to do yet.
return
return nil
}

commitTS := tc.mu.txn.WriteTimestamp
Expand All @@ -606,7 +610,7 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) {
// No need to wait. If !Synthetic then we know the commit timestamp
// leads the local HLC clock, and since that's all we'd need to wait
// for, we can short-circuit.
return
return nil
}

waitUntil := commitTS
Expand All @@ -620,12 +624,16 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) {

// NB: unlock while sleeping to avoid holding the lock for commit-wait.
tc.mu.Unlock()
tc.clock.SleepUntil(waitUntil)
err := tc.clock.SleepUntil(ctx, waitUntil)
tc.mu.Lock()
if err != nil {
return err
}

after := tc.clock.PhysicalTime()
log.VEventf(ctx, 2, "completed commit-wait sleep, took %s", after.Sub(before))
tc.metrics.CommitWaits.Inc(1)
return nil
}

// maybeRejectClientLocked checks whether the transaction is in a state that
Expand Down Expand Up @@ -1259,17 +1267,17 @@ func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error {
}

// DeferCommitWait is part of the TxnSender interface.
func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Context) {
func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.mu.commitWaitDeferred = true
return func(ctx context.Context) {
return func(ctx context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txn.Status != roachpb.COMMITTED {
// If transaction has not committed, there's nothing to do.
return
return nil
}
tc.maybeCommitWait(ctx, true /* deferred */)
return tc.maybeCommitWait(ctx, true /* deferred */)
}
}
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ func TestTxnCommitWait(t *testing.T) {
deferredWaitC := make(chan struct{})
errC := make(chan error, 1)
go func() {
var commitWaitFn func(context.Context)
var commitWaitFn func(context.Context) error
if deferred {
// If the test wants the caller to assume responsibility for commit
// waiting, we expect the transaction to return immediately after a
Expand Down Expand Up @@ -1246,7 +1246,7 @@ func TestTxnCommitWait(t *testing.T) {

if commitWaitFn != nil {
close(deferredWaitC)
commitWaitFn(ctx) // NOTE: blocks
_ = commitWaitFn(ctx) // NOTE: blocks
}

errC <- err
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var (
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaCommitWaitRates = metric.Metadata{
metaCommitWaitCount = metric.Metadata{
Name: "txn.commit_waits",
Help: "Number of KV transactions that had to commit-wait on commit " +
"in order to ensure linearizability. This generally happens to " +
Expand Down Expand Up @@ -235,7 +235,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
Commits: metric.NewCounter(metaCommitsRates),
Commits1PC: metric.NewCounter(metaCommits1PCRates),
ParallelCommits: metric.NewCounter(metaParallelCommitsRates),
CommitWaits: metric.NewCounter(metaCommitWaitRates),
CommitWaits: metric.NewCounter(metaCommitWaitCount),
RefreshSuccess: metric.NewCounter(metaRefreshSuccess),
RefreshFail: metric.NewCounter(metaRefreshFail),
RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (sc *AbortSpan) CopyTo(
hlc.Timestamp{}, nil, &entry,
)
}); err != nil {
return roachpb.NewReplicaCorruptionError(errors.Wrap(err, "AbortSpan.CopyTo"))
return errors.Wrap(err, "AbortSpan.CopyTo")
}
log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount)
return nil
Expand Down
32 changes: 28 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func EndTxn(
ctx, cArgs.EvalCtx, readWriter.(storage.Batch), ms, args, reply.Txn,
)
if err != nil {
return result.Result{}, roachpb.NewReplicaCorruptionError(err)
return result.Result{}, err
}
if err := txnResult.MergeAndDestroy(triggerResult); err != nil {
return result.Result{}, err
Expand Down Expand Up @@ -634,15 +634,39 @@ func RunCommitTrigger(
return result.Result{}, nil
}

// The transaction is committing with a commit trigger. This means that it has
// side-effects beyond those of the intents that it has written.

// The transaction should not have a commit timestamp in the future of present
// time, even it its commit timestamp is synthetic. Such cases should be
// caught in maybeCommitWaitBeforeCommitTrigger before getting here, which
// should sleep for long enough to ensure that the local clock leads the
// commit timestamp. An error here may indicate that the transaction's commit
// timestamp was bumped after it acquired latches.
if txn.WriteTimestamp.Synthetic && rec.Clock().Now().Less(txn.WriteTimestamp) {
return result.Result{}, errors.AssertionFailedf("txn %s with %s commit trigger needs "+
"commit wait. Was its timestamp bumped after acquiring latches?", txn, errors.Safe(ct.Kind()))
}

// Stage the commit trigger's side-effects so that they will go into effect on
// each Replica when the corresponding Raft log entry is applied. Only one
// commit trigger can be set.
if ct.GetSplitTrigger() != nil {
newMS, trigger, err := splitTrigger(
newMS, res, err := splitTrigger(
ctx, rec, batch, *ms, ct.SplitTrigger, txn.WriteTimestamp,
)
if err != nil {
return result.Result{}, roachpb.NewReplicaCorruptionError(err)
}
*ms = newMS
return trigger, err
return res, nil
}
if mt := ct.GetMergeTrigger(); mt != nil {
return mergeTrigger(ctx, rec, batch, ms, mt, txn.WriteTimestamp)
res, err := mergeTrigger(ctx, rec, batch, ms, mt, txn.WriteTimestamp)
if err != nil {
return result.Result{}, roachpb.NewReplicaCorruptionError(err)
}
return res, nil
}
if crt := ct.GetChangeReplicasTrigger(); crt != nil {
// TODO(tbg): once we support atomic replication changes, check that
Expand Down
93 changes: 93 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"regexp"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -1063,3 +1064,95 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
}
})
}

// TestAssertNoCommitWaitIfCommitTrigger tests that an EndTxn that carries a
// commit trigger and needs to commit-wait because it has a commit timestamp in
// the future will return an assertion error. Such situations should trigger a
// higher-level hook (maybeCommitWaitBeforeCommitTrigger) to perform the commit
// wait sleep before the request acquires latches and begins evaluating.
func TestCommitWaitBeforeIntentResolutionIfCommitTrigger(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "commitTrigger", func(t *testing.T, commitTrigger bool) {
for _, cfg := range []struct {
name string
commitTS func(now hlc.Timestamp) hlc.Timestamp
expError bool
}{
{
name: "past",
commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now },
expError: false,
},
{
name: "past-syn",
commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.WithSynthetic(true) },
expError: false,
},
{
name: "future-syn",
commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.Add(100, 0).WithSynthetic(true) },
// If the EndTxn carried a commit trigger and its transaction will need
// to commit-wait because the transaction has a future-time commit
// timestamp, evaluating the request should return an error.
expError: commitTrigger,
},
} {
t.Run(cfg.name, func(t *testing.T) {
ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()

manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
desc := roachpb.RangeDescriptor{
RangeID: 99,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
}

now := clock.Now()
commitTS := cfg.commitTS(now)
txn := roachpb.MakeTransaction("test", desc.StartKey.AsRawKey(), 0, now, 0)
txn.ReadTimestamp = commitTS
txn.WriteTimestamp = commitTS

// Issue the end txn command.
req := roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
Commit: true,
}
if commitTrigger {
req.InternalCommitTrigger = &roachpb.InternalCommitTrigger{
ModifiedSpanTrigger: &roachpb.ModifiedSpanTrigger{SystemConfigSpan: true},
}
}
var resp roachpb.EndTxnResponse
_, err := EndTxn(ctx, batch, CommandArgs{
EvalCtx: (&MockEvalCtx{
Desc: &desc,
Clock: clock,
CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) {
return true, hlc.Timestamp{}, 0
},
}).EvalContext(),
Args: &req,
Header: roachpb.Header{
Timestamp: commitTS,
Txn: &txn,
},
}, &resp)

if cfg.expError {
require.Error(t, err)
require.Regexp(t, `txn .* with modified-span \(system-config\) commit trigger needs commit wait`, err)
} else {
require.NoError(t, err)
}
})
}
})
}
45 changes: 43 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3512,16 +3512,48 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Detect splits and merges over the global read ranges. Assert that the split
// and merge transactions commit with synthetic timestamps, and that the
// commit-wait sleep for these transactions is performed before running their
// commit triggers instead of run on the kv client. For details on why this is
// necessary, see maybeCommitWaitBeforeCommitTrigger.
var clock atomic.Value
var splitsWithSyntheticTS, mergesWithSyntheticTS int64
respFilter := func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
if req, ok := ba.GetArg(roachpb.EndTxn); ok {
endTxn := req.(*roachpb.EndTxnRequest)
if br.Txn.Status == roachpb.COMMITTED && br.Txn.WriteTimestamp.Synthetic {
if ct := endTxn.InternalCommitTrigger; ct != nil {
// The server-side commit-wait sleep should ensure that the commit
// triggers are only run after the commit timestamp is below present
// time.
now := clock.Load().(*hlc.Clock).Now()
require.True(t, br.Txn.WriteTimestamp.Less(now))

switch {
case ct.SplitTrigger != nil:
atomic.AddInt64(&splitsWithSyntheticTS, 1)
case ct.MergeTrigger != nil:
atomic.AddInt64(&mergesWithSyntheticTS, 1)
}
}
}
}
return nil
}

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableMergeQueue: true,
TestingResponseFilter: respFilter,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
clock.Store(s.Clock())
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
config.TestingSetupZoneConfigHook(s.Stopper())
Expand All @@ -3543,11 +3575,18 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
return nil
})

// Write to the range, which has the effect of bumping the closed timestamp.
pArgs := putArgs(descKey, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs)
require.Nil(t, pErr)

// Split the range. Should succeed.
splitKey := append(descKey, []byte("split")...)
splitArgs := adminSplitArgs(splitKey)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)
require.Equal(t, int64(1), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&splitsWithSyntheticTS))

repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())
Expand All @@ -3556,6 +3595,8 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
mergeArgs := adminMergeArgs(descKey)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
require.Nil(t, pErr)
require.Equal(t, int64(2), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&mergesWithSyntheticTS))

repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
Expand Down
Loading