diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 76bb857bfdac..9d6000e90fcc 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -53,7 +53,7 @@ kv.snapshot_recovery.max_ratebyte size8.0 MiBthe rate limit (bytes/sec) to use for recovery snapshots kv.transaction.max_intents_bytesinteger262144maximum number of bytes used to track write intents in transactions kv.transaction.max_refresh_spans_bytesinteger256000maximum number of bytes used to track refresh spans in serializable transactions -kv.transaction.parallel_commits_enabledbooleantrueif enabled, transactional commits will be parallelized with transactional writes +kv.transaction.parallel_commits_enabledbooleanfalseif enabled, transactional commits will be parallelized with transactional writes kv.transaction.write_pipelining_enabledbooleantrueif enabled, transactional writes are pipelined through Raft consensus kv.transaction.write_pipelining_max_batch_sizeinteger128if non-zero, defines that maximum size batch that will be pipelined through Raft consensus kv.transaction.write_pipelining_max_outstanding_sizebyte size256 KiBmaximum number of bytes used to track in-flight pipelined writes before disabling pipelining diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index 623b50683f3e..ae2efcd62a6d 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -28,7 +28,7 @@ import ( var parallelCommitsEnabled = settings.RegisterBoolSetting( "kv.transaction.parallel_commits_enabled", "if enabled, transactional commits will be parallelized with transactional writes", - true, + false, ) // txnCommitter is a txnInterceptor that concerns itself with committing and @@ -152,7 +152,13 @@ func (tc *txnCommitter) SendLocked( // write set; no writes will be in-flight concurrently with the EndTransaction // request. if len(et.InFlightWrites) > 0 && !tc.canCommitInParallelWithWrites(ba, et) { - et.IntentSpans = mergeIntoSpans(et.IntentSpans, et.InFlightWrites) + // NB: when parallel commits is disabled, this is the best place to + // detect whether the batch has only distinct spans. We can set this + // flag based on whether any of previously declared in-flight writes + // in this batch overlap with each other. This will have (rare) false + // negatives when the in-flight writes overlap with existing intent + // spans, but never false positives. + et.IntentSpans, ba.Header.DistinctSpans = mergeIntoSpans(et.IntentSpans, et.InFlightWrites) // Disable parallel commits. et.InFlightWrites = nil } @@ -223,7 +229,7 @@ func (tc *txnCommitter) SendLocked( // determination about the status of our STAGING transaction. To avoid this, // we transition to an explicitly committed transaction as soon as possible. // This also has the side-effect of kicking off intent resolution. - mergedIntentSpans := mergeIntoSpans(et.IntentSpans, et.InFlightWrites) + mergedIntentSpans, _ := mergeIntoSpans(et.IntentSpans, et.InFlightWrites) tc.makeTxnCommitExplicitAsync(ctx, br.Txn, mergedIntentSpans) // Switch the status on the batch response's transaction to COMMITTED. No @@ -317,15 +323,17 @@ func (tc *txnCommitter) canCommitInParallelWithWrites( return true } -func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) []roachpb.Span { +// mergeIntoSpans merges all provided sequenced writes into the span slice. It +// then sorts the spans and merges an that overlap. Returns true iff all of the +// spans are distinct. +func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) ([]roachpb.Span, bool) { if s == nil { s = make([]roachpb.Span, 0, len(ws)) } for _, w := range ws { s = append(s, roachpb.Span{Key: w.Key}) } - s, _ = roachpb.MergeSpans(s) - return s + return roachpb.MergeSpans(s) } // needTxnRetryAfterStaging determines whether the transaction needs to refresh diff --git a/pkg/kv/txn_interceptor_committer_test.go b/pkg/kv/txn_interceptor_committer_test.go index d02efc9937c7..f314d1c5eeb8 100644 --- a/pkg/kv/txn_interceptor_committer_test.go +++ b/pkg/kv/txn_interceptor_committer_test.go @@ -28,9 +28,12 @@ import ( ) func makeMockTxnCommitter() (txnCommitter, *mockLockedSender) { + st := cluster.MakeTestingClusterSettings() + // TODO(nvanbenschoten): remove when parallel commits are enabled. + parallelCommitsEnabled.Override(&st.SV, true) mockSender := &mockLockedSender{} return txnCommitter{ - st: cluster.MakeTestingClusterSettings(), + st: st, stopper: stop.NewStopper(), wrapped: mockSender, mu: new(syncutil.Mutex),