Skip to content

Commit

Permalink
kv: commit-wait before running commit triggers and resolving intents
Browse files Browse the repository at this point in the history
Fixes a serious bug revealed by #63747.

This commit fixes a bug revealed by kvnemesis where a range-merge watcher on the
right-hand side of a range merge could incorrectly determine that a range merge
txn had succeeded, when in reality, it had failed. The watcher would then put
the RHS leaseholder replica into a stalled state by setting `r.mu.destroyStatus`
to `destroyReasonMergePending`, effectively stalling any operation on the range
indefinitely.

The setup for this bug was that a range was operating with a `global_reads` zone
configuration attribute, so it was pushing all writers into the future. The
range was split and then rapidly merged back together. During the merge txn, a
range-merge watcher (see `maybeWatchForMergeLocked`) began monitoring the state
of the range merge txn. The problem was that at the time that the range merge
txn committed, neither the meta descriptor version written by the merge or even
the meta descriptor version written by the split were visible to the watcher's
follow-up query. Because the watcher read below the split txn's descriptor, it
came to the wrong conclusion about the merge.

It is interesting to think about what is going wrong here, because it's not
immediately obvious who is at fault. If a transaction has a commit timestamp in
the future of present time, it will need to commit-wait before acknowledging the
client. Typically, this is performed in the TxnCoordSender after the transaction
has committed and resolved its intents (see TxnCoordSender.maybeCommitWait). It
is safe to wait after a future-time transaction has committed and resolved
intents without compromising linearizability because the uncertainty interval of
concurrent and later readers ensures atomic visibility of the effects of the
transaction. In other words, all of the transaction's intents will become
visible and will remain visible at once, which is sometimes called "monotonic
reads". This is true even if the resolved intents are at a high enough timestamp
such that they are not visible to concurrent readers immediately after they are
resolved, but only become visible sometime during the writer's commit-wait
sleep. This property is central to the correctness of non-blocking transactions.
See: https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20200811_non_blocking_txns.md

However, if a transaction has a commit trigger, the side-effects of the trigger
will go into effect immediately upon applying to corresponding Raft log entry.
This poses a problem, because we do not want part of a transaction's effects
(e.g. its commit trigger) to become visible to onlookers before the rest of its
effects do (e.g. its intent writes).

To avoid this problem, this commit adds special server-side logic to perform the
commit-wait stage of a transaction with a commit trigger early, before its
EndTxn evaluates and its commit trigger fires. This results in the transaction
waiting longer to commit, run its commit trigger, and resolve its intents, but
it is otherwise safe and effective.

Interestingly, this is quite similar to how Spanner handles its commit-wait rule:

> Before allowing any coordinator replica to apply the commit record, the
> coordinator leader waits until TT.after(s), so as to obey the commit-wait rule
> described in Section 4.1.2. Because the coordinator leader chose s based on
> TT.now().latest, and now waits until that timestamp is guaranteed to be in the
> past, the expected wait is at least 2 ∗ . This wait is typically overlapped with
> Paxos communication. After commit wait, the coordinator sends the commit
> timestamp to the client and all other participant leaders. Each participant
> leader logs the transaction’s outcome through Paxos. All participants apply at
> the same timestamp and then release locks.

Of course, the whole point of non-blocking transactions is that we release locks
early and use clocks (through uncertainty intervals + a reader-side commit-wait
rule) to enforce consistency, so we don't want to make this change for standard
transactions.

Before this change, I could hit the bug in about 5 minutes of stressing
kvnemesis on a roachprod cluster. After this change, I've been able to run
kvnemesis for a few hours without issue.

Release note (bug fix): Fixed a rare bug present in betas where rapid range
splits and merges on a GLOBAL table could lead to a stuck leaseholder replica.
The situation is no longer possible.
  • Loading branch information
nvanbenschoten committed Apr 27, 2021
1 parent cfb4b95 commit 776076b
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var (
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaCommitWaitRates = metric.Metadata{
metaCommitWaitCount = metric.Metadata{
Name: "txn.commit_waits",
Help: "Number of KV transactions that had to commit-wait on commit " +
"in order to ensure linearizability. This generally happens to " +
Expand Down Expand Up @@ -235,7 +235,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
Commits: metric.NewCounter(metaCommitsRates),
Commits1PC: metric.NewCounter(metaCommits1PCRates),
ParallelCommits: metric.NewCounter(metaParallelCommitsRates),
CommitWaits: metric.NewCounter(metaCommitWaitRates),
CommitWaits: metric.NewCounter(metaCommitWaitCount),
RefreshSuccess: metric.NewCounter(metaRefreshSuccess),
RefreshFail: metric.NewCounter(metaRefreshFail),
RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/abortspan/abortspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (sc *AbortSpan) CopyTo(
hlc.Timestamp{}, nil, &entry,
)
}); err != nil {
return roachpb.NewReplicaCorruptionError(errors.Wrap(err, "AbortSpan.CopyTo"))
return errors.Wrap(err, "AbortSpan.CopyTo")
}
log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount)
return nil
Expand Down
32 changes: 28 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func EndTxn(
ctx, cArgs.EvalCtx, readWriter.(storage.Batch), ms, args, reply.Txn,
)
if err != nil {
return result.Result{}, roachpb.NewReplicaCorruptionError(err)
return result.Result{}, err
}
if err := txnResult.MergeAndDestroy(triggerResult); err != nil {
return result.Result{}, err
Expand Down Expand Up @@ -634,15 +634,39 @@ func RunCommitTrigger(
return result.Result{}, nil
}

// The transaction is committing with a commit trigger. This means that it has
// side-effects beyond those of the intents that it has written.

// The transaction should not have a commit timestamp in the future of present
// time, even it its commit timestamp is synthetic. Such cases should be
// caught in maybeCommitWaitBeforeCommitTrigger before getting here, which
// should sleep for long enough to ensure that the local clock leads the
// commit timestamp. An error here may indicate that the transaction's commit
// timestamp was bumped after it acquired latches.
if txn.WriteTimestamp.Synthetic && rec.Clock().Now().Less(txn.WriteTimestamp) {
return result.Result{}, errors.AssertionFailedf("txn %s with %s commit trigger needs "+
"commit wait. Was its timestamp bumped after acquiring latches?", txn, errors.Safe(ct.Kind()))
}

// Stage the commit trigger's side-effects so that they will go into effect on
// each Replica when the corresponding Raft log entry is applied. Only one
// commit trigger can be set.
if ct.GetSplitTrigger() != nil {
newMS, trigger, err := splitTrigger(
newMS, res, err := splitTrigger(
ctx, rec, batch, *ms, ct.SplitTrigger, txn.WriteTimestamp,
)
if err != nil {
return result.Result{}, roachpb.NewReplicaCorruptionError(err)
}
*ms = newMS
return trigger, err
return res, nil
}
if mt := ct.GetMergeTrigger(); mt != nil {
return mergeTrigger(ctx, rec, batch, ms, mt, txn.WriteTimestamp)
res, err := mergeTrigger(ctx, rec, batch, ms, mt, txn.WriteTimestamp)
if err != nil {
return result.Result{}, roachpb.NewReplicaCorruptionError(err)
}
return res, nil
}
if crt := ct.GetChangeReplicasTrigger(); crt != nil {
// TODO(tbg): once we support atomic replication changes, check that
Expand Down
93 changes: 93 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"regexp"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -1063,3 +1064,95 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
}
})
}

// TestAssertNoCommitWaitIfCommitTrigger tests that an EndTxn that carries a
// commit trigger and needs to commit-wait because it has a commit timestamp in
// the future will return an assertion error. Such situations should trigger a
// higher-level hook (maybeCommitWaitBeforeCommitTrigger) to perform the commit
// wait sleep before the request acquires latches and begins evaluating.
func TestCommitWaitBeforeIntentResolutionIfCommitTrigger(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "commitTrigger", func(t *testing.T, commitTrigger bool) {
for _, cfg := range []struct {
name string
commitTS func(now hlc.Timestamp) hlc.Timestamp
expError bool
}{
{
name: "past",
commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now },
expError: false,
},
{
name: "past-syn",
commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.WithSynthetic(true) },
expError: false,
},
{
name: "future-syn",
commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.Add(100, 0).WithSynthetic(true) },
// If the EndTxn carried a commit trigger and its transaction will need
// to commit-wait because the transaction has a future-time commit
// timestamp, evaluating the request should return an error.
expError: commitTrigger,
},
} {
t.Run(cfg.name, func(t *testing.T) {
ctx := context.Background()
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()

manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
desc := roachpb.RangeDescriptor{
RangeID: 99,
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
}

now := clock.Now()
commitTS := cfg.commitTS(now)
txn := roachpb.MakeTransaction("test", desc.StartKey.AsRawKey(), 0, now, 0)
txn.ReadTimestamp = commitTS
txn.WriteTimestamp = commitTS

// Issue the end txn command.
req := roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
Commit: true,
}
if commitTrigger {
req.InternalCommitTrigger = &roachpb.InternalCommitTrigger{
ModifiedSpanTrigger: &roachpb.ModifiedSpanTrigger{SystemConfigSpan: true},
}
}
var resp roachpb.EndTxnResponse
_, err := EndTxn(ctx, batch, CommandArgs{
EvalCtx: (&MockEvalCtx{
Desc: &desc,
Clock: clock,
CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) {
return true, hlc.Timestamp{}, 0
},
}).EvalContext(),
Args: &req,
Header: roachpb.Header{
Timestamp: commitTS,
Txn: &txn,
},
}, &resp)

if cfg.expError {
require.Error(t, err)
require.Regexp(t, `txn .* with modified-span \(system-config\) commit trigger needs commit wait`, err)
} else {
require.NoError(t, err)
}
})
}
})
}
45 changes: 43 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3512,16 +3512,48 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Detect splits and merges over the global read ranges. Assert that the split
// and merge transactions commit with synthetic timestamps, and that the
// commit-wait sleep for these transactions is performed before running their
// commit triggers instead of run on the kv client. For details on why this is
// necessary, see maybeCommitWaitBeforeCommitTrigger.
var clock atomic.Value
var splitsWithSyntheticTS, mergesWithSyntheticTS int64
respFilter := func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
if req, ok := ba.GetArg(roachpb.EndTxn); ok {
endTxn := req.(*roachpb.EndTxnRequest)
if br.Txn.Status == roachpb.COMMITTED && br.Txn.WriteTimestamp.Synthetic {
if ct := endTxn.InternalCommitTrigger; ct != nil {
// The server-side commit-wait sleep should ensure that the commit
// triggers are only run after the commit timestamp is below present
// time.
now := clock.Load().(*hlc.Clock).Now()
require.True(t, br.Txn.WriteTimestamp.Less(now))

switch {
case ct.SplitTrigger != nil:
atomic.AddInt64(&splitsWithSyntheticTS, 1)
case ct.MergeTrigger != nil:
atomic.AddInt64(&mergesWithSyntheticTS, 1)
}
}
}
}
return nil
}

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableMergeQueue: true,
TestingResponseFilter: respFilter,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
clock.Store(s.Clock())
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
config.TestingSetupZoneConfigHook(s.Stopper())
Expand All @@ -3543,11 +3575,18 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
return nil
})

// Write to the range, which has the effect of bumping the closed timestamp.
pArgs := putArgs(descKey, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs)
require.Nil(t, pErr)

// Split the range. Should succeed.
splitKey := append(descKey, []byte("split")...)
splitArgs := adminSplitArgs(splitKey)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)
require.Equal(t, int64(1), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&splitsWithSyntheticTS))

repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())
Expand All @@ -3556,6 +3595,8 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
mergeArgs := adminMergeArgs(descKey)
_, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
require.Nil(t, pErr)
require.Equal(t, int64(2), store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&mergesWithSyntheticTS))

repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ var (
Unit: metric.Unit_COUNT,
}

// Server-side transaction metrics.
metaCommitWaitBeforeCommitTriggerCount = metric.Metadata{
Name: "txn.commit_waits.before_commit_trigger",
Help: "Number of KV transactions that had to commit-wait on the server " +
"before committing because they had a commit trigger",
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}

// RocksDB metrics.
metaRdbBlockCacheHits = metric.Metadata{
Name: "rocksdb.block.cache.hits",
Expand Down Expand Up @@ -1084,6 +1093,9 @@ type StoreMetrics struct {
// Follower read metrics.
FollowerReadsCount *metric.Counter

// Server-side transaction metrics.
CommitWaitsBeforeCommitTrigger *metric.Counter

// RocksDB metrics.
RdbBlockCacheHits *metric.Gauge
RdbBlockCacheMisses *metric.Gauge
Expand Down Expand Up @@ -1455,6 +1467,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
// Follower reads metrics.
FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount),

// Server-side transaction metrics.
CommitWaitsBeforeCommitTrigger: metric.NewCounter(metaCommitWaitBeforeCommitTriggerCount),

// RocksDB metrics.
RdbBlockCacheHits: metric.NewGauge(metaRdbBlockCacheHits),
RdbBlockCacheMisses: metric.NewGauge(metaRdbBlockCacheMisses),
Expand Down
Loading

0 comments on commit 776076b

Please sign in to comment.