diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index 147b2e05ba6c..64c2af60da1d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -75,7 +75,7 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } - metaCommitWaitRates = metric.Metadata{ + metaCommitWaitCount = metric.Metadata{ Name: "txn.commit_waits", Help: "Number of KV transactions that had to commit-wait on commit " + "in order to ensure linearizability. This generally happens to " + @@ -235,7 +235,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), - CommitWaits: metric.NewCounter(metaCommitWaitRates), + CommitWaits: metric.NewCounter(metaCommitWaitCount), RefreshSuccess: metric.NewCounter(metaRefreshSuccess), RefreshFail: metric.NewCounter(metaRefreshFail), RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), diff --git a/pkg/kv/kvserver/abortspan/abortspan.go b/pkg/kv/kvserver/abortspan/abortspan.go index b576b219f2e8..357e51d3f205 100644 --- a/pkg/kv/kvserver/abortspan/abortspan.go +++ b/pkg/kv/kvserver/abortspan/abortspan.go @@ -183,7 +183,7 @@ func (sc *AbortSpan) CopyTo( hlc.Timestamp{}, nil, &entry, ) }); err != nil { - return roachpb.NewReplicaCorruptionError(errors.Wrap(err, "AbortSpan.CopyTo")) + return errors.Wrap(err, "AbortSpan.CopyTo") } log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount) return nil diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index d43bcd90d607..0573d3f68b5a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -366,7 +366,7 @@ func EndTxn( ctx, cArgs.EvalCtx, readWriter.(storage.Batch), ms, args, reply.Txn, ) if err != nil { - return result.Result{}, roachpb.NewReplicaCorruptionError(err) + return result.Result{}, err } if err := txnResult.MergeAndDestroy(triggerResult); err != nil { return result.Result{}, err @@ -634,15 +634,39 @@ func RunCommitTrigger( return result.Result{}, nil } + // The transaction is committing with a commit trigger. This means that it has + // side-effects beyond those of the intents that it has written. + + // The transaction should not have a commit timestamp in the future of present + // time, even it its commit timestamp is synthetic. Such cases should be + // caught in maybeCommitWaitBeforeCommitTrigger before getting here, which + // should sleep for long enough to ensure that the local clock leads the + // commit timestamp. An error here may indicate that the transaction's commit + // timestamp was bumped after it acquired latches. + if txn.WriteTimestamp.Synthetic && rec.Clock().Now().Less(txn.WriteTimestamp) { + return result.Result{}, errors.AssertionFailedf("txn %s with %s commit trigger needs "+ + "commit wait. Was its timestamp bumped after acquiring latches?", txn, errors.Safe(ct.Kind())) + } + + // Stage the commit trigger's side-effects so that they will go into effect on + // each Replica when the corresponding Raft log entry is applied. Only one + // commit trigger can be set. if ct.GetSplitTrigger() != nil { - newMS, trigger, err := splitTrigger( + newMS, res, err := splitTrigger( ctx, rec, batch, *ms, ct.SplitTrigger, txn.WriteTimestamp, ) + if err != nil { + return result.Result{}, roachpb.NewReplicaCorruptionError(err) + } *ms = newMS - return trigger, err + return res, nil } if mt := ct.GetMergeTrigger(); mt != nil { - return mergeTrigger(ctx, rec, batch, ms, mt, txn.WriteTimestamp) + res, err := mergeTrigger(ctx, rec, batch, ms, mt, txn.WriteTimestamp) + if err != nil { + return result.Result{}, roachpb.NewReplicaCorruptionError(err) + } + return res, nil } if crt := ct.GetChangeReplicasTrigger(); crt != nil { // TODO(tbg): once we support atomic replication changes, check that diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index 4859e64f0a7e..026db59a83bf 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -14,6 +14,7 @@ import ( "context" "regexp" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan" @@ -1063,3 +1064,95 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) { } }) } + +// TestAssertNoCommitWaitIfCommitTrigger tests that an EndTxn that carries a +// commit trigger and needs to commit-wait because it has a commit timestamp in +// the future will return an assertion error. Such situations should trigger a +// higher-level hook (maybeCommitWaitBeforeCommitTrigger) to perform the commit +// wait sleep before the request acquires latches and begins evaluating. +func TestCommitWaitBeforeIntentResolutionIfCommitTrigger(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "commitTrigger", func(t *testing.T, commitTrigger bool) { + for _, cfg := range []struct { + name string + commitTS func(now hlc.Timestamp) hlc.Timestamp + expError bool + }{ + { + name: "past", + commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now }, + expError: false, + }, + { + name: "past-syn", + commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.WithSynthetic(true) }, + expError: false, + }, + { + name: "future-syn", + commitTS: func(now hlc.Timestamp) hlc.Timestamp { return now.Add(100, 0).WithSynthetic(true) }, + // If the EndTxn carried a commit trigger and its transaction will need + // to commit-wait because the transaction has a future-time commit + // timestamp, evaluating the request should return an error. + expError: commitTrigger, + }, + } { + t.Run(cfg.name, func(t *testing.T) { + ctx := context.Background() + db := storage.NewDefaultInMemForTesting() + defer db.Close() + batch := db.NewBatch() + defer batch.Close() + + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + desc := roachpb.RangeDescriptor{ + RangeID: 99, + StartKey: roachpb.RKey("a"), + EndKey: roachpb.RKey("z"), + } + + now := clock.Now() + commitTS := cfg.commitTS(now) + txn := roachpb.MakeTransaction("test", desc.StartKey.AsRawKey(), 0, now, 0) + txn.ReadTimestamp = commitTS + txn.WriteTimestamp = commitTS + + // Issue the end txn command. + req := roachpb.EndTxnRequest{ + RequestHeader: roachpb.RequestHeader{Key: txn.Key}, + Commit: true, + } + if commitTrigger { + req.InternalCommitTrigger = &roachpb.InternalCommitTrigger{ + ModifiedSpanTrigger: &roachpb.ModifiedSpanTrigger{SystemConfigSpan: true}, + } + } + var resp roachpb.EndTxnResponse + _, err := EndTxn(ctx, batch, CommandArgs{ + EvalCtx: (&MockEvalCtx{ + Desc: &desc, + Clock: clock, + CanCreateTxn: func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) { + return true, hlc.Timestamp{}, 0 + }, + }).EvalContext(), + Args: &req, + Header: roachpb.Header{ + Timestamp: commitTS, + Txn: &txn, + }, + }, &resp) + + if cfg.expError { + require.Error(t, err) + require.Regexp(t, `txn .* with modified-span \(system-config\) commit trigger needs commit wait`, err) + } else { + require.NoError(t, err) + } + }) + } + }) +} diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 32a25ae6e70d..fb409d3f35bf 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -3512,16 +3512,48 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // Detect splits and merges over the global read ranges. Assert that the split + // and merge transactions commit with synthetic timestamps, and that the + // commit-wait sleep for these transactions is performed before running their + // commit triggers instead of run on the kv client. For details on why this is + // necessary, see maybeCommitWaitBeforeCommitTrigger. + var clock atomic.Value + var splitsWithSyntheticTS, mergesWithSyntheticTS int64 + respFilter := func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + if req, ok := ba.GetArg(roachpb.EndTxn); ok { + endTxn := req.(*roachpb.EndTxnRequest) + if br.Txn.Status == roachpb.COMMITTED && br.Txn.WriteTimestamp.Synthetic { + if ct := endTxn.InternalCommitTrigger; ct != nil { + // The server-side commit-wait sleep should ensure that the commit + // triggers are only run after the commit timestamp is below present + // time. + now := clock.Load().(*hlc.Clock).Now() + require.True(t, br.Txn.WriteTimestamp.Less(now)) + + switch { + case ct.SplitTrigger != nil: + atomic.AddInt64(&splitsWithSyntheticTS, 1) + case ct.MergeTrigger != nil: + atomic.AddInt64(&mergesWithSyntheticTS, 1) + } + } + } + } + return nil + } + ctx := context.Background() serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - DisableMergeQueue: true, + DisableMergeQueue: true, + TestingResponseFilter: respFilter, }, }, }) s := serv.(*server.TestServer) defer s.Stopper().Stop(ctx) + clock.Store(s.Clock()) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) config.TestingSetupZoneConfigHook(s.Stopper()) @@ -3543,11 +3575,18 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { return nil }) + // Write to the range, which has the effect of bumping the closed timestamp. + pArgs := putArgs(descKey, []byte("foo")) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), pArgs) + require.Nil(t, pErr) + // Split the range. Should succeed. splitKey := append(descKey, []byte("split")...) splitArgs := adminSplitArgs(splitKey) - _, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs) + _, pErr = kv.SendWrapped(ctx, store.TestSender(), splitArgs) require.Nil(t, pErr) + require.Equal(t, int64(1), store.Metrics().CommitWaitsBeforeCommitTrigger.Count()) + require.Equal(t, int64(1), atomic.LoadInt64(&splitsWithSyntheticTS)) repl := store.LookupReplica(roachpb.RKey(splitKey)) require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey()) @@ -3556,6 +3595,8 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { mergeArgs := adminMergeArgs(descKey) _, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs) require.Nil(t, pErr) + require.Equal(t, int64(2), store.Metrics().CommitWaitsBeforeCommitTrigger.Count()) + require.Equal(t, int64(1), atomic.LoadInt64(&mergesWithSyntheticTS)) repl = store.LookupReplica(roachpb.RKey(splitKey)) require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey()) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 634c99d90217..691dfef3a58a 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -294,6 +294,15 @@ var ( Unit: metric.Unit_COUNT, } + // Server-side transaction metrics. + metaCommitWaitBeforeCommitTriggerCount = metric.Metadata{ + Name: "txn.commit_waits.before_commit_trigger", + Help: "Number of KV transactions that had to commit-wait on the server " + + "before committing because they had a commit trigger", + Measurement: "KV Transactions", + Unit: metric.Unit_COUNT, + } + // RocksDB metrics. metaRdbBlockCacheHits = metric.Metadata{ Name: "rocksdb.block.cache.hits", @@ -1084,6 +1093,9 @@ type StoreMetrics struct { // Follower read metrics. FollowerReadsCount *metric.Counter + // Server-side transaction metrics. + CommitWaitsBeforeCommitTrigger *metric.Counter + // RocksDB metrics. RdbBlockCacheHits *metric.Gauge RdbBlockCacheMisses *metric.Gauge @@ -1455,6 +1467,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Follower reads metrics. FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), + // Server-side transaction metrics. + CommitWaitsBeforeCommitTrigger: metric.NewCounter(metaCommitWaitBeforeCommitTriggerCount), + // RocksDB metrics. RdbBlockCacheHits: metric.NewGauge(metaRdbBlockCacheHits), RdbBlockCacheMisses: metric.NewGauge(metaRdbBlockCacheMisses), diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 863c70e8f977..25c91288a44a 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -70,6 +70,9 @@ func (r *Replica) sendWithRangeID( if err := r.maybeRateLimitBatch(ctx, ba); err != nil { return nil, roachpb.NewError(err) } + if err := r.maybeCommitWaitBeforeCommitTrigger(ctx, ba); err != nil { + return nil, roachpb.NewError(err) + } // NB: must be performed before collecting request spans. ba, err := maybeStripInFlightWrites(ba) @@ -124,6 +127,94 @@ func (r *Replica) sendWithRangeID( return br, pErr } +// maybeCommitWaitBeforeCommitTrigger detects batches that are attempting to +// commit a transaction with a commit trigger and that will need to perform a +// commit-wait at some point. For reasons described below, transactions with +// commit triggers need to perform their commit wait sleep before their trigger +// runs, so this function eagerly performs that sleep before the batch moves on +// to evaluation. The function guarantees that if the transaction ends up +// committing with its current provisional commit timestamp, it will not need to +// commit wait any further. +func (r *Replica) maybeCommitWaitBeforeCommitTrigger( + ctx context.Context, ba *roachpb.BatchRequest, +) error { + args, hasET := ba.GetArg(roachpb.EndTxn) + if !hasET { + return nil + } + et := args.(*roachpb.EndTxnRequest) + if !et.Commit || et.InternalCommitTrigger == nil { + // Not committing with a commit trigger. + return nil + } + txn := ba.Txn + if txn.ReadTimestamp != txn.WriteTimestamp && !ba.CanForwardReadTimestamp { + // The commit can not succeed. + return nil + } + + // A transaction is committing with a commit trigger. This means that it has + // side-effects beyond those of the intents that it has written. + // + // If the transaction has a commit timestamp in the future of present time, it + // will need to commit-wait before acknowledging the client. Typically, this + // is performed in the TxnCoordSender after the transaction has committed and + // resolved its intents (see TxnCoordSender.maybeCommitWait). It is safe to + // wait after a future-time transaction has committed and resolved intents + // without compromising linearizability because the uncertainty interval of + // concurrent and later readers ensures atomic visibility of the effects of + // the transaction. In other words, all of the transaction's intents will + // become visible and will remain visible at once, which is sometimes called + // "monotonic reads". This is true even if the resolved intents are at a high + // enough timestamp such that they are not visible to concurrent readers + // immediately after they are resolved, but only become visible sometime + // during the writer's commit-wait sleep. This property is central to the + // correctness of non-blocking transactions. See: + // https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20200811_non_blocking_txns.md + // + // However, if a transaction has a commit trigger, the side-effects of the + // trigger will go into effect immediately after the EndTxn's Raft command is + // applied to the Raft state machine. This poses a problem, because we do not + // want part of a transaction's effects (e.g. its commit trigger) to become + // visible to onlookers before the rest of its effects do (e.g. its intent + // writes). To avoid this problem, we perform the commit-wait stage of a + // transaction with a commit trigger early, before its commit triggers fire. + // This results in the transaction waiting longer to commit and resolve its + // intents, but is otherwise safe and effective. + // + // NOTE: it would be easier to perform this wait during the evaluation of the + // corresponding EndTxn request instead of detecting the case here. However, + // we intentionally do not commit wait during evaluation because we do not + // want to sleep while holding latches and blocking other requests. So + // instead, we commit wait here and then assert that transactions with commit + // triggers do not need to commit wait further by the time they reach command + // evaluation. + // + // NOTE: just like in TxnCoordSender.maybeCommitWait, we only need to perform + // a commit-wait sleep if the commit timestamp is "synthetic". Otherwise, it + // is known not to be in advance of present time. + if !txn.WriteTimestamp.Synthetic { + return nil + } + if !r.Clock().Now().Less(txn.WriteTimestamp) { + return nil + } + + waitUntil := txn.WriteTimestamp + before := r.Clock().PhysicalTime() + est := waitUntil.GoTime().Sub(before) + log.VEventf(ctx, 1, "performing server-side commit-wait sleep for ~%s", est) + + if err := r.Clock().SleepUntil(ctx, waitUntil); err != nil { + return err + } + + after := r.Clock().PhysicalTime() + log.VEventf(ctx, 1, "completed server-side commit-wait sleep, took %s", after.Sub(before)) + r.store.metrics.CommitWaitsBeforeCommitTrigger.Inc(1) + return nil +} + // maybeAddRangeInfoToResponse populates br.RangeInfo if the client doesn't // have up-to-date info about the range's descriptor and lease. func (r *Replica) maybeAddRangeInfoToResponse( diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index eafcd18ad9dd..30b90f715baa 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -551,7 +551,7 @@ func (r *Replica) evaluate1PC( if err != nil { return onePCResult{ success: onePCFailed, - pErr: roachpb.NewErrorf("failed to run commit trigger: %s", err), + pErr: roachpb.NewError(errors.Wrap(err, "failed to run commit trigger")), } } if err := res.MergeAndDestroy(innerResult); err != nil { diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index a815081afa55..4916d313ed57 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -861,6 +861,31 @@ func (v Value) PrettyPrint() string { return buf.String() } +// Kind returns the kind of commit trigger as a string. +func (ct InternalCommitTrigger) Kind() string { + switch { + case ct.SplitTrigger != nil: + return "split" + case ct.MergeTrigger != nil: + return "merge" + case ct.ChangeReplicasTrigger != nil: + return "change-replicas" + case ct.ModifiedSpanTrigger != nil: + switch { + case ct.ModifiedSpanTrigger.SystemConfigSpan: + return "modified-span (system-config)" + case ct.ModifiedSpanTrigger.NodeLivenessSpan != nil: + return "modified-span (node-liveness)" + default: + panic("unknown modified-span commit trigger kind") + } + case ct.StickyBitTrigger != nil: + return "sticky-bit" + default: + panic("unknown commit trigger kind") + } +} + // IsFinalized determines whether the transaction status is in a finalized // state. A finalized state is terminal, meaning that once a transaction // enters one of these states, it will never leave it. diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index bf4e20befd0b..1e47c73b7ed4 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -967,6 +967,7 @@ var charts = []sectionDescription{ "txn.commits1PC", "txn.parallelcommits", "txn.commit_waits", + "txn.commit_waits.before_commit_trigger", }, }, {