Skip to content

Commit

Permalink
kvcoord: merge budgets for in-flight writes and lock spans
Browse files Browse the repository at this point in the history
Before this patch, the txnPipeliner maintained two different memory
budgets:

1) kv.transaction.max_intents_bytes - a limit on a txn's lock spans
2) kv.transaction.write_pipelining_max_outstanding_size - a limit on a
   txn's in-flight writes

Besides protecting memory usage, these guys also prevent the commit's
Raft command from becoming too big.

Having two budgets for very related things is unnecessary. In-flight
writes frequently turn into lock spans, and so thinking about how one of
the budget feeds into the other is confusing. The exhaustion of the
in-flight budget also had a hand in this, by turning in-flight writes
into locks immediately.

This patch makes write_pipelining_max_outstanding_bytes a no-op.
max_intent_bytes takes on also tracking in-flight writes. A request
whose async consensus writes would push this budget over the limit is
not allowed to perform async-consensus, on the argument that performing
sync writes is better because the locks from those writes can be
condensed.

This patch is also done with an eye towards offering an option to reject
transactions that are about to go over budget. Having a single budget to
think about makes that conceptually simpler.

Release note (general change): The setting
`kv.transaction.write_pipelining_max_outstanding_size` becomes a no-op.
Its function is folded into the `kv.transaction.max_intents_bytes`
setting.
  • Loading branch information
andreimatei committed Jul 1, 2021
1 parent 92d6493 commit 4dc342e
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 59 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/condensable_span_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (s *condensableSpanSet) mergeAndSort() {
// limit. Condensing is only performed at the level of individual ranges, not
// across ranges, so it's possible to not be able to condense as much as
// desired.
//
// maxBytes <= 0 means that each range will be maximally condensed.
func (s *condensableSpanSet) maybeCondense(
ctx context.Context, riGen rangeIteratorFactory, maxBytes int64,
) bool {
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/kvclient/kvcoord/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"sort"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// asSortedSlice returns the set data in sorted order.
//
// Too inefficient for production.
func (s *condensableSpanSet) asSortedSlice() []roachpb.Span {
set := s.asSlice()
cpy := make(roachpb.Spans, len(set))
copy(cpy, set)
sort.Sort(cpy)
return cpy
}
62 changes: 33 additions & 29 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
var pipelinedWritesMaxInFlightSize = settings.RegisterByteSizeSetting(
// TODO(nvanbenschoten): The need for this extra setting alongside
// kv.transaction.max_intents_bytes indicates that we should explore
// the unification of intent tracking and in-flight write tracking.
// The two mechanisms track subtly different information, but there's
// no fundamental reason why they can't be unified.
"kv.transaction.write_pipelining_max_outstanding_size",
"maximum number of bytes used to track in-flight pipelined writes before disabling pipelining",
1<<18, /* 256 KB */
)
var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
"kv.transaction.write_pipelining_max_batch_size",
"if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
Expand All @@ -57,16 +47,22 @@ var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

// trackedWritesMaxSize is a threshold in bytes for lock spans stored on the
// coordinator during the lifetime of a transaction. Locks are included with a
// transaction on commit or abort, to be cleaned up asynchronously. If they
// exceed this threshold, they're condensed to avoid memory blowup both on the
// coordinator and (critically) on the EndTxn command at the Raft group
// responsible for the transaction record.
// trackedWritesMaxSize is a byte threshold for the tracking of writes performed
// a single transaction. This includes the tracking of lock spans and of
// in-flight writes, both stored in the txnPipeliner.
//
// Locks are included with a transaction on commit or abort, to be cleaned up
// asynchronously. If they exceed this threshold, they're condensed to avoid
// memory blowup both on the coordinator and (critically) on the EndTxn command
// at the Raft group responsible for the transaction record.
//
// The in-flight writes are, on the happy path, also attached to the commit
// EndTxn. On less happy paths, they are turned into lock spans.
//
// NB: this is called "max_intents_bytes" instead of "max_lock_bytes" because
// it was created before the concept of intents were generalized to locks.
// Switching it would require a migration which doesn't seem worth it.
// NB: this is called "max_intents_bytes" instead of "max_lock_bytes" because it
// was created before the concept of intents were generalized to locks, and also
// before we introduced in-flight writes with the "parallel commits". Switching
// it would require a migration which doesn't seem worth it.
//
// Note: Default value was arbitrarily set to 256KB but practice showed that
// it could be raised higher. When transaction reaches this limit, intent
Expand Down Expand Up @@ -255,7 +251,7 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}

ba.AsyncConsensus = tp.canUseAsyncConsensus(ba)
ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)

// Adjust the batch so that it doesn't miss any in-flight writes.
ba = tp.chainToInFlightWrites(ba)
Expand Down Expand Up @@ -343,7 +339,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(

// canUseAsyncConsensus checks the conditions necessary for this batch to be
// allowed to set the AsyncConsensus flag.
func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool {
func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba roachpb.BatchRequest) bool {
// Short-circuit for EndTransactions; it's common enough to have batches
// containing a prefix of writes (which, by themselves, are all eligible for
// async consensus) and then an EndTxn (which is not eligible). Note that
Expand All @@ -357,11 +353,10 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool {
return false
}

// We provide a setting to bound the size of in-flight writes that the
// pipeliner is tracking. If this batch would push us over this setting,
// don't allow it to perform async consensus.
// There's a memory budget for lock tracking. If this batch would push us over
// this setting, don't allow it to perform async consensus.
addedIFBytes := int64(0)
maxIFBytes := pipelinedWritesMaxInFlightSize.Get(&tp.st.SV)
maxTrackingBytes := trackedWritesMaxSize.Get(&tp.st.SV)

// We provide a setting to bound the number of writes we permit in a batch
// that uses async consensus. This is useful because we'll have to prove
Expand Down Expand Up @@ -390,15 +385,20 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ba roachpb.BatchRequest) bool {
// worth it.
return false
}
// Only allow batches that would not push us over the maximum
// in-flight write size limit to perform consensus asynchronously.
// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
// its writes will only need to be tracked as locks, and we can compress the
// lock spans with loss of fidelity. This helps both memory usage and the
// eventual size of the Raft command for the commit.
//
// NB: this estimation is conservative because it doesn't factor
// in that some writes may be proven by this batch and removed
// from the in-flight write set. The real accounting in
// inFlightWriteSet.{insert,remove} gets this right.
addedIFBytes += keySize(req.Header().Key)
if (tp.ifWrites.byteSize() + addedIFBytes) > maxIFBytes {
if (tp.ifWrites.byteSize() + addedIFBytes + tp.lockFootprint.bytes) > maxTrackingBytes {
log.VEventf(ctx, 2, "cannot perform async consensus because memory budget exceeded")
return false
}
}
Expand Down Expand Up @@ -521,7 +521,11 @@ func (tp *txnPipeliner) updateLockTracking(
// After adding new writes to the lock footprint, check whether we need to
// condense the set to stay below memory limits.
alreadyCondensed := tp.lockFootprint.condensed
condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, trackedWritesMaxSize.Get(&tp.st.SV))
// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
// later.
locksBudget := trackedWritesMaxSize.Get(&tp.st.SV) - tp.ifWrites.byteSize()
condensed := tp.lockFootprint.maybeCondense(ctx, tp.riGen, locksBudget)
if condensed && !alreadyCondensed {
if tp.condensedIntentsEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx,
Expand Down
Loading

0 comments on commit 4dc342e

Please sign in to comment.