Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvcoord: merge budgets for in-flight writes and lock spans #66915

Merged
merged 3 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_test(
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
"helpers_test.go",
"integration_test.go",
"main_test.go",
"range_iter_test.go",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/condensable_span_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (s *condensableSpanSet) mergeAndSort() {
// limit. Condensing is only performed at the level of individual ranges, not
// across ranges, so it's possible to not be able to condense as much as
// desired.
//
// maxBytes <= 0 means that each range will be maximally condensed.
func (s *condensableSpanSet) maybeCondense(
ctx context.Context, riGen rangeIteratorFactory, maxBytes int64,
) bool {
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/kvclient/kvcoord/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"sort"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// asSortedSlice returns the set data in sorted order.
//
// Too inefficient for production.
func (s *condensableSpanSet) asSortedSlice() []roachpb.Span {
set := s.asSlice()
cpy := make(roachpb.Spans, len(set))
copy(cpy, set)
sort.Sort(cpy)
return cpy
}
101 changes: 60 additions & 41 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
var pipelinedWritesMaxInFlightSize = settings.RegisterByteSizeSetting(
// TODO(nvanbenschoten): The need for this extra setting alongside
// kv.transaction.max_intents_bytes indicates that we should explore
// the unification of intent tracking and in-flight write tracking.
// The two mechanisms track subtly different information, but there's
// no fundamental reason why they can't be unified.
"kv.transaction.write_pipelining_max_outstanding_size",
"maximum number of bytes used to track in-flight pipelined writes before disabling pipelining",
1<<18, /* 256 KB */
)
var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
"kv.transaction.write_pipelining_max_batch_size",
"if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
Expand All @@ -57,16 +47,22 @@ var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

// trackedWritesMaxSize is a threshold in bytes for lock spans stored on the
// coordinator during the lifetime of a transaction. Locks are included with a
// transaction on commit or abort, to be cleaned up asynchronously. If they
// exceed this threshold, they're condensed to avoid memory blowup both on the
// coordinator and (critically) on the EndTxn command at the Raft group
// responsible for the transaction record.
// trackedWritesMaxSize is a byte threshold for the tracking of writes performed
// a single transaction. This includes the tracking of lock spans and of
// in-flight writes, both stored in the txnPipeliner.
//
// Locks are included with a transaction on commit or abort, to be cleaned up
// asynchronously. If they exceed this threshold, they're condensed to avoid
// memory blowup both on the coordinator and (critically) on the EndTxn command
// at the Raft group responsible for the transaction record.
//
// NB: this is called "max_intents_bytes" instead of "max_lock_bytes" because
// it was created before the concept of intents were generalized to locks.
// Switching it would require a migration which doesn't seem worth it.
// The in-flight writes are, on the happy path, also attached to the commit
// EndTxn. On less happy paths, they are turned into lock spans.
//
// NB: this is called "max_intents_bytes" instead of "max_lock_bytes" because it
// was created before the concept of intents were generalized to locks, and also
// before we introduced in-flight writes with the "parallel commits". Switching
// it would require a migration which doesn't seem worth it.
//
// Note: Default value was arbitrarily set to 256KB but practice showed that
// it could be raised higher. When transaction reaches this limit, intent
Expand Down Expand Up @@ -255,7 +251,7 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}

ba.AsyncConsensus = tp.canUseAsyncConsensus(ba)
ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)

// Adjust the batch so that it doesn't miss any in-flight writes.
ba = tp.chainToInFlightWrites(ba)
Expand Down Expand Up @@ -343,16 +339,24 @@ func (tp *txnPipeliner) attachLocksToEndTxn(

// canUseAsyncConsensus checks the conditions necessary for this batch to be
// allowed to set the AsyncConsensus flag.
func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool {
func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba roachpb.BatchRequest) bool {
// Short-circuit for EndTransactions; it's common enough to have batches
// containing a prefix of writes (which, by themselves, are all eligible for
// async consensus) and then an EndTxn (which is not eligible). Note that
// ba.GetArg() is efficient for EndTransactions, having its own internal
// optimization.
if _, hasET := ba.GetArg(roachpb.EndTxn); hasET {
return false
}

if !pipelinedWritesEnabled.Get(&tp.st.SV) || tp.disabled {
return false
}

// We provide a setting to bound the size of in-flight writes that the
// pipeliner is tracking. If this batch would push us over this setting,
// don't allow it to perform async consensus.
// There's a memory budget for lock tracking. If this batch would push us over
// this setting, don't allow it to perform async consensus.
addedIFBytes := int64(0)
maxIFBytes := pipelinedWritesMaxInFlightSize.Get(&tp.st.SV)
maxTrackingBytes := trackedWritesMaxSize.Get(&tp.st.SV)

// We provide a setting to bound the number of writes we permit in a batch
// that uses async consensus. This is useful because we'll have to prove
Expand Down Expand Up @@ -381,15 +385,20 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool {
// worth it.
return false
}
// Only allow batches that would not push us over the maximum
// in-flight write size limit to perform consensus asynchronously.
// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
// its writes will only need to be tracked as locks, and we can compress the
// lock spans with loss of fidelity. This helps both memory usage and the
// eventual size of the Raft command for the commit.
//
// NB: this estimation is conservative because it doesn't factor
// in that some writes may be proven by this batch and removed
// from the in-flight write set. The real accounting in
// inFlightWriteSet.{insert,remove} gets this right.
addedIFBytes += keySize(req.Header().Key)
if (tp.ifWrites.byteSize() + addedIFBytes) > maxIFBytes {
if (tp.ifWrites.byteSize() + addedIFBytes + tp.lockFootprint.bytes) > maxTrackingBytes {
log.VEventf(ctx, 2, "cannot perform async consensus because memory budget exceeded")
return false
}
}
Expand Down Expand Up @@ -505,23 +514,33 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba roachpb.BatchRequest) roachpb.B
func (tp *txnPipeliner) updateLockTracking(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) {
tp.updateLockTrackingInner(ctx, ba, br)

// Deal with compacting the lock spans.

// After adding new writes to the lock footprint, check whether we need to
// condense the set to stay below memory limits.
defer func() {
alreadyCondensed := tp.lockFootprint.condensed
condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV))
if condensed && !alreadyCondensed {
if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx,
"a transaction has hit the intent tracking limit (kv.transaction.max_intents_bytes); "+
"is it a bulk operation? Intent cleanup will be slower. txn: %s ba: %s",
ba.Txn, ba.Summary())
}
tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
alreadyCondensed := tp.lockFootprint.condensed
// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
// later.
locksBudget := trackedWritesMaxSize.Get(&tp.st.SV) - tp.ifWrites.byteSize()
condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, locksBudget)
if condensed && !alreadyCondensed {
if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx,
"a transaction has hit the intent tracking limit (kv.transaction.max_intents_bytes); "+
"is it a bulk operation? Intent cleanup will be slower. txn: %s ba: %s",
ba.Txn, ba.Summary())
}
}()
tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
}
}

func (tp *txnPipeliner) updateLockTrackingInner(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) {
// If the request failed, add all lock acquisitions attempts directly to the
// lock footprint. This reduces the likelihood of dangling locks blocking
// concurrent requests for extended periods of time. See #3346.
Expand Down
Loading