Skip to content

Commit

Permalink
kv: pipeline replicated lock acquisition
Browse files Browse the repository at this point in the history
Fixes #117978.

This commit completes the client-side handling of replicated lock acquisition
pipelining. Replicated lock acquisition through Get, Scan, and ReverseScan
requests now qualifies to be pipelined. The `txnPipeliner` is updated to track
the strength associated with each in-flight write and pass that along to the
corresponding QueryIntentRequest.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 2, 2024
1 parent bbcddba commit dec4830
Show file tree
Hide file tree
Showing 13 changed files with 703 additions and 81 deletions.
4 changes: 4 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don'
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
kv.transaction.write_pipelining.enabled boolean true if enabled, transactional writes are pipelined through Raft consensus application
kv.transaction.write_pipelining.max_batch_size integer 128 if non-zero, defines that maximum size batch that will be pipelined through Raft consensus application
schedules.backup.gc_protection.enabled boolean true enable chaining of GC protection across backups run as part of a schedule application
security.ocsp.mode enumeration off use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] application
security.ocsp.timeout duration 3s timeout before considering the OCSP server unreachable application
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@
<tr><td><div id="setting-kv-transaction-max-intents-bytes" class="anchored"><code>kv.transaction.max_intents_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-refresh-spans-bytes" class="anchored"><code>kv.transaction.max_refresh_spans_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-reject-over-max-intents-budget-enabled" class="anchored"><code>kv.transaction.reject_over_max_intents_budget.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-locking-reads-enabled" class="anchored"><code>kv.transaction.write_pipelining.locking_reads.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional locking reads are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-ranged-writes-enabled" class="anchored"><code>kv.transaction.write_pipelining.ranged_writes.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional ranged writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-enabled" class="anchored"><code>kv.transaction.write_pipelining.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-write-pipelining-max-batch-size" class="anchored"><code>kv.transaction.write_pipelining.max_batch_size</code></div></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kvadmission-store-provisioned-bandwidth" class="anchored"><code>kvadmission.store.provisioned_bandwidth</code></div></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to true</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-schedules-backup-gc-protection-enabled" class="anchored"><code>schedules.backup.gc_protection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-security-ocsp-mode" class="anchored"><code>security.ocsp.mode</code></div></td><td>enumeration</td><td><code>off</code></td><td>use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
100 changes: 75 additions & 25 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,7 +40,25 @@ var PipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
settings.WithName("kv.transaction.write_pipelining.enabled"),
settings.WithPublic,
)

var pipelinedRangedWritesEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining.ranged_writes.enabled",
"if enabled, transactional ranged writes are pipelined through Raft consensus",
true,
settings.WithPublic,
)

var pipelinedLockingReadsEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining.locking_reads.enabled",
"if enabled, transactional locking reads are pipelined through Raft consensus",
true,
settings.WithPublic,
)

var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining_max_batch_size",
Expand All @@ -55,6 +74,7 @@ var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
128,
settings.NonNegativeInt,
settings.WithName("kv.transaction.write_pipelining.max_batch_size"),
settings.WithPublic,
)

// TrackedWritesMaxSize is a byte threshold for the tracking of writes performed
Expand Down Expand Up @@ -416,7 +436,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(
// and forgo a parallel commit, but let's not break that abstraction
// boundary here.
if kvpb.IsIntentWrite(req) && !kvpb.IsRange(req) {
w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence}
w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence, Strength: lock.Intent}
et.InFlightWrites = append(et.InFlightWrites, w)
} else {
et.LockSpans = append(et.LockSpans, h.Span())
Expand All @@ -433,7 +453,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(
log.Infof(ctx, "intent: [%s,%s)", intent.Key, intent.EndKey)
}
for _, write := range et.InFlightWrites {
log.Infof(ctx, "in-flight: %d:%s", write.Sequence, write.Key)
log.Infof(ctx, "in-flight: %d:%s (%s)", write.Sequence, write.Key, write.Strength)
}
}
return ba, nil
Expand Down Expand Up @@ -500,6 +520,21 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
return false
}

if kvpb.IsRange(req) {
if !pipelinedRangedWritesEnabled.Get(&tp.st.SV) {
return false
}
}

if !kvpb.IsIntentWrite(req) {
if !pipelinedLockingReadsEnabled.Get(&tp.st.SV) {
return false
}
if !tp.st.Version.IsActive(ctx, clusterversion.V24_1_ReplicatedLockPipelining) {
return false
}
}

// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
Expand Down Expand Up @@ -574,7 +609,10 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch
RequestHeader: kvpb.RequestHeader{
Key: w.Key,
},
Txn: meta,
Txn: meta,
Strength: w.Strength,
// TODO: test, maybe extend TestTxnPipelinerSavepoints
IgnoredSeqNums: ba.Txn.IgnoredSeqNums,
ErrorIfMissing: true,
})

Expand Down Expand Up @@ -758,21 +796,25 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// in the future.
qiResp := resp.(*kvpb.QueryIntentResponse)
if qiResp.FoundIntent || qiResp.FoundUnpushedIntent {
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence)
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence, qiReq.Strength)
// Move to lock footprint.
tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key})
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
seq := req.Header().Sequence
str := lock.Intent
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
}
trackLocks := func(span roachpb.Span, _ lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
if span.EndKey != nil {
log.Fatalf(ctx, "unexpected multi-key intent pipelined")
}
tp.ifWrites.insert(span.Key, seq)
tp.ifWrites.insert(span.Key, seq, str)
} else {
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint.
Expand Down Expand Up @@ -853,7 +895,7 @@ func (tp *txnPipeliner) populateLeafInputState(tis *roachpb.LeafTxnInputState) {
func (tp *txnPipeliner) initializeLeaf(tis *roachpb.LeafTxnInputState) {
// Copy all in-flight writes into the inFlightWrite tree.
for _, w := range tis.InFlightWrites {
tp.ifWrites.insert(w.Key, w.Sequence)
tp.ifWrites.insert(w.Key, w.Sequence, w.Strength)
}
}

Expand Down Expand Up @@ -904,7 +946,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
// been verified in the meantime) by removing all the extra ones.
if needCollecting {
for _, ifw := range writesToDelete {
tp.ifWrites.remove(ifw.Key, ifw.Sequence)
tp.ifWrites.remove(ifw.Key, ifw.Sequence, ifw.Strength)
}
} else {
tp.ifWrites.clear(true /* reuse */)
Expand All @@ -926,7 +968,7 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool {

// inFlightWrites represent a commitment to proving (via QueryIntent) that
// a point write succeeded in replicating an intent with a specific sequence
// number.
// number and strength.
type inFlightWrite struct {
roachpb.SequencedWrite
// chainIndex is used to avoid chaining on to the same in-flight write
Expand All @@ -936,15 +978,17 @@ type inFlightWrite struct {
}

// makeInFlightWrite constructs an inFlightWrite.
func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite {
return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}}
func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) inFlightWrite {
return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{
Key: key, Sequence: seq, Strength: str,
}}
}

// Less implements the btree.Item interface.
//
// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites
// with the same Key but different Sequences are not considered equal and are
// maintained separately in the inFlightWritesSet.
// inFlightWrites are ordered by Key, then by Sequence, then by Strength. Two
// inFlightWrites with the same Key but different Sequences and/or Strengths are
// not considered equal and are maintained separately in the inFlightWritesSet.
func (a *inFlightWrite) Less(bItem btree.Item) bool {
b := bItem.(*inFlightWrite)
kCmp := a.Key.Compare(b.Key)
Expand All @@ -956,6 +1000,10 @@ func (a *inFlightWrite) Less(bItem btree.Item) bool {
// Different Sequence.
return a.Sequence < b.Sequence
}
if a.Strength != b.Strength {
// Different Strength.
return a.Strength < b.Strength
}
// Equal.
return false
}
Expand All @@ -975,17 +1023,17 @@ type inFlightWriteSet struct {

// insert attempts to insert an in-flight write that has not been proven to have
// succeeded into the in-flight write set.
func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {
func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) {
if s.t == nil {
// Lazily initialize btree.
s.t = btree.New(txnPipelinerBtreeDegree)
}

w := s.alloc.alloc(key, seq)
w := s.alloc.alloc(key, seq, str)
delItem := s.t.ReplaceOrInsert(w)
if delItem != nil {
// An in-flight write with the same key and sequence already existed in the
// set. This is unexpected, but we handle it.
// set. We replaced it with an identical in-flight write.
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
} else {
s.bytes += keySize(key)
Expand All @@ -994,14 +1042,14 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {

// remove attempts to remove an in-flight write from the in-flight write set.
// The method will be a no-op if the write was already proved.
func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) {
func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) {
if s.len() == 0 {
// Set is empty.
return
}

// Delete the write from the in-flight writes set.
s.tmp1 = makeInFlightWrite(key, seq)
s.tmp1 = makeInFlightWrite(key, seq, str)
delItem := s.t.Delete(&s.tmp1)
if delItem == nil {
// The write was already proven or the txn epoch was incremented.
Expand Down Expand Up @@ -1037,13 +1085,13 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh
// Set is empty.
return
}
s.tmp1 = makeInFlightWrite(start, 0)
s.tmp1 = makeInFlightWrite(start, 0, 0)
if end == nil {
// Point lookup.
s.tmp2 = makeInFlightWrite(start, math.MaxInt32)
s.tmp2 = makeInFlightWrite(start, math.MaxInt32, 0)
} else {
// Range lookup.
s.tmp2 = makeInFlightWrite(end, 0)
s.tmp2 = makeInFlightWrite(end, 0, 0)
}
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
Expand Down Expand Up @@ -1093,9 +1141,11 @@ func (s *inFlightWriteSet) asSlice() []roachpb.SequencedWrite {
// amortizing the overhead of each allocation.
type inFlightWriteAlloc []inFlightWrite

// alloc allocates a new inFlightWrite with the specified key and sequence
// number.
func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlightWrite {
// alloc allocates a new inFlightWrite with the specified key, sequence number,
// and strength.
func (a *inFlightWriteAlloc) alloc(
key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength,
) *inFlightWrite {
// If the current alloc slice has no extra capacity, reallocate a new chunk.
if cap(*a)-len(*a) == 0 {
const chunkAllocMinSize = 4
Expand All @@ -1112,7 +1162,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig

*a = (*a)[:len(*a)+1]
w := &(*a)[len(*a)-1]
*w = makeInFlightWrite(key, seq)
*w = makeInFlightWrite(key, seq, str)
return w
}

Expand Down
Loading

0 comments on commit dec4830

Please sign in to comment.