Skip to content

Commit

Permalink
kv: pipeline replicated lock acquisition
Browse files Browse the repository at this point in the history
Fixes cockroachdb#117978.

TODO: Needs testing.

This commit completes the client-side handling of replicated lock acquisition
pipelining. Replicated lock acquisition through Get, Scan, and ReverseScan
requests now qualifies to be pipelined. The `txnPipeliner` is updated to track
the strength associated with each in-flight write and pass that along to the
corresponding QueryIntentRequest.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 28, 2024
1 parent 078defb commit 3fb68c8
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 71 deletions.
98 changes: 74 additions & 24 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -40,6 +41,21 @@ var PipelinedWritesEnabled = settings.RegisterBoolSetting(
true,
settings.WithName("kv.transaction.write_pipelining.enabled"),
)

var pipelinedRangedWritesEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining.ranged_writes.enabled",
"if enabled, transactional ranged writes are pipelined through Raft consensus",
true,
)

var pipelinedLockingReadsEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining.locking_reads.enabled",
"if enabled, transactional locking reads are pipelined through Raft consensus",
true,
)

var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining_max_batch_size",
Expand Down Expand Up @@ -416,7 +432,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(
// and forgo a parallel commit, but let's not break that abstraction
// boundary here.
if kvpb.IsIntentWrite(req) && !kvpb.IsRange(req) {
w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence}
w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence, Strength: lock.Intent}
et.InFlightWrites = append(et.InFlightWrites, w)
} else {
et.LockSpans = append(et.LockSpans, h.Span())
Expand All @@ -433,7 +449,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(
log.Infof(ctx, "intent: [%s,%s)", intent.Key, intent.EndKey)
}
for _, write := range et.InFlightWrites {
log.Infof(ctx, "in-flight: %d:%s", write.Sequence, write.Key)
log.Infof(ctx, "in-flight: %d:%s (%s)", write.Sequence, write.Key, write.Strength)
}
}
return ba, nil
Expand Down Expand Up @@ -500,6 +516,21 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
return false
}

if kvpb.IsRange(req) {
if !pipelinedRangedWritesEnabled.Get(&tp.st.SV) {
return false
}
}

if !kvpb.IsIntentWrite(req) {
if !pipelinedLockingReadsEnabled.Get(&tp.st.SV) {
return false
}
if !tp.st.Version.IsActive(ctx, clusterversion.V24_1_ReplicatedLockPipelining) {
return false
}
}

// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
Expand All @@ -511,6 +542,11 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
// in that some writes may be proven by this batch and removed
// from the in-flight write set. The real accounting in
// inFlightWriteSet.{insert,remove} gets this right.
//
// TODO: this is interesting. We can't predict the additional in-flight
// writes that a ranged request will add. To our in-flight writes set. And
// once we perform async consensus, we can't merge them away until we prove
// that they have succeeded. What should we do?
addedIFBytes += keySize(req.Header().Key)
if (tp.ifWrites.byteSize() + addedIFBytes + tp.lockFootprint.bytes) > maxTrackingBytes {
log.VEventf(ctx, 2, "cannot perform async consensus because memory budget exceeded")
Expand Down Expand Up @@ -575,6 +611,8 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch
Key: w.Key,
},
Txn: meta,
Strength: w.Strength,
IgnoredSeqNums: ba.Txn.IgnoredSeqNums,
ErrorIfMissing: true,
})

Expand Down Expand Up @@ -758,21 +796,25 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// in the future.
qiResp := resp.(*kvpb.QueryIntentResponse)
if qiResp.FoundIntent || qiResp.FoundUnpushedIntent {
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence)
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence, qiReq.Strength)
// Move to lock footprint.
tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key})
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
seq := req.Header().Sequence
str := lock.Intent
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
}
trackLocks := func(span roachpb.Span, _ lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
if span.EndKey != nil {
log.Fatalf(ctx, "unexpected multi-key intent pipelined")
}
tp.ifWrites.insert(span.Key, seq)
tp.ifWrites.insert(span.Key, seq, str)
} else {
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint.
Expand Down Expand Up @@ -853,7 +895,7 @@ func (tp *txnPipeliner) populateLeafInputState(tis *roachpb.LeafTxnInputState) {
func (tp *txnPipeliner) initializeLeaf(tis *roachpb.LeafTxnInputState) {
// Copy all in-flight writes into the inFlightWrite tree.
for _, w := range tis.InFlightWrites {
tp.ifWrites.insert(w.Key, w.Sequence)
tp.ifWrites.insert(w.Key, w.Sequence, w.Strength)
}
}

Expand Down Expand Up @@ -904,7 +946,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
// been verified in the meantime) by removing all the extra ones.
if needCollecting {
for _, ifw := range writesToDelete {
tp.ifWrites.remove(ifw.Key, ifw.Sequence)
tp.ifWrites.remove(ifw.Key, ifw.Sequence, ifw.Strength)
}
} else {
tp.ifWrites.clear(true /* reuse */)
Expand All @@ -926,7 +968,7 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool {

// inFlightWrites represent a commitment to proving (via QueryIntent) that
// a point write succeeded in replicating an intent with a specific sequence
// number.
// number and strength.
type inFlightWrite struct {
roachpb.SequencedWrite
// chainIndex is used to avoid chaining on to the same in-flight write
Expand All @@ -936,15 +978,17 @@ type inFlightWrite struct {
}

// makeInFlightWrite constructs an inFlightWrite.
func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite {
return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}}
func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) inFlightWrite {
return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{
Key: key, Sequence: seq, Strength: str,
}}
}

// Less implements the btree.Item interface.
//
// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites
// with the same Key but different Sequences are not considered equal and are
// maintained separately in the inFlightWritesSet.
// inFlightWrites are ordered by Key, then by Sequence, then by Strength. Two
// inFlightWrites with the same Key but different Sequences and/or Strengths are
// not considered equal and are maintained separately in the inFlightWritesSet.
func (a *inFlightWrite) Less(bItem btree.Item) bool {
b := bItem.(*inFlightWrite)
kCmp := a.Key.Compare(b.Key)
Expand All @@ -956,6 +1000,10 @@ func (a *inFlightWrite) Less(bItem btree.Item) bool {
// Different Sequence.
return a.Sequence < b.Sequence
}
if a.Strength != b.Strength {
// Different Strength.
return a.Strength < b.Strength
}
// Equal.
return false
}
Expand All @@ -975,17 +1023,17 @@ type inFlightWriteSet struct {

// insert attempts to insert an in-flight write that has not been proven to have
// succeeded into the in-flight write set.
func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {
func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) {
if s.t == nil {
// Lazily initialize btree.
s.t = btree.New(txnPipelinerBtreeDegree)
}

w := s.alloc.alloc(key, seq)
w := s.alloc.alloc(key, seq, str)
delItem := s.t.ReplaceOrInsert(w)
if delItem != nil {
// An in-flight write with the same key and sequence already existed in the
// set. This is unexpected, but we handle it.
// set. We replaced it with an identical in-flight write.
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
} else {
s.bytes += keySize(key)
Expand All @@ -994,14 +1042,14 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {

// remove attempts to remove an in-flight write from the in-flight write set.
// The method will be a no-op if the write was already proved.
func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) {
func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) {
if s.len() == 0 {
// Set is empty.
return
}

// Delete the write from the in-flight writes set.
s.tmp1 = makeInFlightWrite(key, seq)
s.tmp1 = makeInFlightWrite(key, seq, str)
delItem := s.t.Delete(&s.tmp1)
if delItem == nil {
// The write was already proven or the txn epoch was incremented.
Expand Down Expand Up @@ -1037,13 +1085,13 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh
// Set is empty.
return
}
s.tmp1 = makeInFlightWrite(start, 0)
s.tmp1 = makeInFlightWrite(start, 0, 0)
if end == nil {
// Point lookup.
s.tmp2 = makeInFlightWrite(start, math.MaxInt32)
s.tmp2 = makeInFlightWrite(start, math.MaxInt32, 0)
} else {
// Range lookup.
s.tmp2 = makeInFlightWrite(end, 0)
s.tmp2 = makeInFlightWrite(end, 0, 0)
}
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
Expand Down Expand Up @@ -1093,9 +1141,11 @@ func (s *inFlightWriteSet) asSlice() []roachpb.SequencedWrite {
// amortizing the overhead of each allocation.
type inFlightWriteAlloc []inFlightWrite

// alloc allocates a new inFlightWrite with the specified key and sequence
// number.
func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlightWrite {
// alloc allocates a new inFlightWrite with the specified key, sequence number,
// and strength.
func (a *inFlightWriteAlloc) alloc(
key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength,
) *inFlightWrite {
// If the current alloc slice has no extra capacity, reallocate a new chunk.
if cap(*a)-len(*a) == 0 {
const chunkAllocMinSize = 4
Expand All @@ -1112,7 +1162,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig

*a = (*a)[:len(*a)+1]
w := &(*a)[len(*a)-1]
*w = makeInFlightWrite(key, seq)
*w = makeInFlightWrite(key, seq, str)
return w
}

Expand Down
Loading

0 comments on commit 3fb68c8

Please sign in to comment.