Skip to content

Commit

Permalink
kv: track in-flight writes with same key and different seq nums
Browse files Browse the repository at this point in the history
Informs #117978.

This commit simplifies the logic in `inFlightWriteSet` to track
in-flight writes with the same key but different sequence numbers
separately. This simplification is done to avoid confusion around
in-flight writes with different seq nums and/or different strengths
(which will be added shortly), and whether any of these in-flight
writes should imply that the others are no longer in-flight.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 25, 2024
1 parent 789f779 commit 629e830
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 80 deletions.
95 changes: 46 additions & 49 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"context"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -909,9 +910,29 @@ type inFlightWrite struct {
chainIndex int64
}

// makeInFlightWrite constructs an inFlightWrite.
func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite {
return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}}
}

// Less implements the btree.Item interface.
func (a *inFlightWrite) Less(b btree.Item) bool {
return a.Key.Compare(b.(*inFlightWrite).Key) < 0
//
// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites
// with the same Key but different Sequences are not considered equal and are
// maintained separately in the inFlightWritesSet.
func (a *inFlightWrite) Less(bItem btree.Item) bool {
b := bItem.(*inFlightWrite)
kCmp := a.Key.Compare(b.Key)
if kCmp != 0 {
// Different Keys.
return kCmp < 0
}
if a.Sequence != b.Sequence {
// Different Sequence.
return a.Sequence < b.Sequence
}
// Equal.
return false
}

// inFlightWriteSet is an ordered set of in-flight point writes. Given a set
Expand All @@ -928,60 +949,40 @@ type inFlightWriteSet struct {
}

// insert attempts to insert an in-flight write that has not been proven to have
// succeeded into the in-flight write set. If the write with an equal or larger
// sequence number already exists in the set, the method is a no-op.
// succeeded into the in-flight write set.
func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {
if s.t == nil {
// Lazily initialize btree.
s.t = btree.New(txnPipelinerBtreeDegree)
}

s.tmp1.Key = key
item := s.t.Get(&s.tmp1)
if item != nil {
otherW := item.(*inFlightWrite)
if seq > otherW.Sequence {
// Existing in-flight write has old information.
otherW.Sequence = seq
}
return
}

w := s.alloc.alloc(key, seq)
s.t.ReplaceOrInsert(w)
s.bytes += keySize(key)
delItem := s.t.ReplaceOrInsert(w)
if delItem != nil {
// An in-flight write with the same key and sequence already existed in the
// set. This is unexpected, but we handle it.
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
} else {
s.bytes += keySize(key)
}
}

// remove attempts to remove an in-flight write from the in-flight write set.
// The method will be a no-op if the write was already proved. Care is taken
// not to accidentally remove a write to the same key but at a later epoch or
// sequence number.
// The method will be a no-op if the write was already proved.
func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) {
if s.len() == 0 {
// Set is empty.
return
}

s.tmp1.Key = key
item := s.t.Get(&s.tmp1)
if item == nil {
// Delete the write from the in-flight writes set.
s.tmp1 = makeInFlightWrite(key, seq)
delItem := s.t.Delete(&s.tmp1)
if delItem == nil {
// The write was already proven or the txn epoch was incremented.
return
}

w := item.(*inFlightWrite)
if seq < w.Sequence {
// The sequence might have changed, which means that a new write was
// sent to the same key. This write would have been forced to prove
// the existence of current write already.
return
}

// Delete the write from the in-flight writes set.
delItem := s.t.Delete(item)
if delItem != nil {
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
}
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
s.bytes -= keySize(key)

// Assert that the byte accounting is believable.
Expand Down Expand Up @@ -1011,20 +1012,18 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh
// Set is empty.
return
}
s.tmp1 = makeInFlightWrite(start, 0)
if end == nil {
// Point lookup.
s.tmp1.Key = start
if i := s.t.Get(&s.tmp1); i != nil {
f(i.(*inFlightWrite))
}
s.tmp2 = makeInFlightWrite(start, math.MaxInt32)
} else {
// Range lookup.
s.tmp1.Key, s.tmp2.Key = start, end
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
return true
})
s.tmp2 = makeInFlightWrite(end, 0)
}
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
return true
})
}

// len returns the number of the in-flight writes in the set.
Expand Down Expand Up @@ -1088,9 +1087,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig

*a = (*a)[:len(*a)+1]
w := &(*a)[len(*a)-1]
*w = inFlightWrite{
SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq},
}
*w = makeInFlightWrite(key, seq)
return w
}

Expand Down
53 changes: 22 additions & 31 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
incArgs := kvpb.IncrementRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}}
incArgs.Sequence = 4
ba.Add(&incArgs)
// Write at the same key as another write in the same batch. Will only
// result in a single in-flight write, at the larger sequence number.
// Write at the same key as another write in the same batch. Will result in
// two separate in-flight writes.
delArgs := kvpb.DeleteRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}}
delArgs.Sequence = 5
ba.Add(&delArgs)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.IsType(t, &kvpb.IncrementResponse{}, br.Responses[2].GetInner())
require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[3].GetInner())
require.Nil(t, pErr)
require.Equal(t, 3, tp.ifWrites.len())
require.Equal(t, 4, tp.ifWrites.len())

wMin := tp.ifWrites.t.Min().(*inFlightWrite)
require.Equal(t, cputArgs.Key, wMin.Key)
Expand All @@ -277,29 +277,34 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
ba.Add(&etArgs)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 5)
require.Len(t, ba.Requests, 6)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner())
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[4].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[4].GetInner())
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[5].GetInner())

qiReq1 := ba.Requests[1].GetQueryIntent()
qiReq2 := ba.Requests[2].GetQueryIntent()
qiReq3 := ba.Requests[3].GetQueryIntent()
qiReq4 := ba.Requests[4].GetQueryIntent()
require.Equal(t, keyA, qiReq1.Key)
require.Equal(t, keyB, qiReq2.Key)
require.Equal(t, keyC, qiReq3.Key)
require.Equal(t, keyC, qiReq4.Key)
require.Equal(t, enginepb.TxnSeq(2), qiReq1.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(3), qiReq2.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(5), qiReq3.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(4), qiReq3.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(5), qiReq4.Txn.Sequence)

etReq := ba.Requests[4].GetEndTxn()
etReq := ba.Requests[5].GetEndTxn()
require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans)
expInFlight := []roachpb.SequencedWrite{
{Key: keyA, Sequence: 2},
{Key: keyB, Sequence: 3},
{Key: keyC, Sequence: 4},
{Key: keyC, Sequence: 5},
{Key: keyD, Sequence: 6},
}
Expand All @@ -311,13 +316,15 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
// NOTE: expected response from a v23.1 node.
// TODO(nvanbenschoten): update this case when v23.1 compatibility is no
// longer required.
br.Responses[2].GetQueryIntent().FoundIntent = false
br.Responses[1].GetQueryIntent().FoundIntent = false
br.Responses[1].GetQueryIntent().FoundUnpushedIntent = true
// NOTE: expected responses from a v23.2 node.
br.Responses[2].GetQueryIntent().FoundIntent = true
br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true
br.Responses[3].GetQueryIntent().FoundIntent = true
br.Responses[2].GetQueryIntent().FoundUnpushedIntent = false
br.Responses[3].GetQueryIntent().FoundUnpushedIntent = false
br.Responses[4].GetQueryIntent().FoundIntent = true
br.Responses[4].GetQueryIntent().FoundUnpushedIntent = false
return br, nil
})

Expand Down Expand Up @@ -1522,11 +1529,10 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
s := savepoint{seqNum: enginepb.TxnSeq(12), active: true}
tp.createSavepointLocked(ctx, &s)

// Some more writes after the savepoint. One of them is on key "c" that is
// part of the savepoint too, so we'll check that, upon rollback, the savepoint is
// updated to remove the lower-seq-num write to "c" that it was tracking as in-flight.
// Some more writes after the savepoint.
tp.ifWrites.insert(roachpb.Key("c"), 13)
tp.ifWrites.insert(roachpb.Key("d"), 14)
require.Equal(t, 5, tp.ifWrites.len())
require.Empty(t, tp.lockFootprint.asSlice())

// Now verify one of the writes. When we'll rollback to the savepoint below,
Expand Down Expand Up @@ -1554,31 +1560,28 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{{Key: roachpb.Key("a")}}, tp.lockFootprint.asSlice())
require.Equal(t, 3, tp.ifWrites.len()) // We've verified one out of 4 writes.
require.Equal(t, 4, tp.ifWrites.len()) // We've verified one out of 5 writes.

// Now restore the savepoint and check that the in-flight write state has been restored
// and all rolled-back writes were moved to the lock footprint.
tp.rollbackToSavepointLocked(ctx, s)

// Check that the tracked inflight writes were updated correctly. The key that
// had been verified ("a") should have been taken out of the savepoint. Same
// for the "c", for which the pipeliner is now tracking a
// higher-sequence-number (which implies that it must have verified the lower
// sequence number write).
// had been verified ("a") should have been taken out of the savepoint.
var ifWrites []inFlightWrite
tp.ifWrites.ascend(func(w *inFlightWrite) {
ifWrites = append(ifWrites, *w)
})
require.Equal(t,
[]inFlightWrite{
{SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}},
{SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12}},
},
ifWrites)

// Check that the footprint was updated correctly. In addition to the "a"
// which it had before, it will also have "d" because it's not part of the
// savepoint. It will also have "c" since that's not an in-flight write any
// more (see above).
// savepoint. It will also have "c".
require.Equal(t,
[]roachpb.Span{
{Key: roachpb.Key("a")},
Expand Down Expand Up @@ -1780,7 +1783,6 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
largeAs[i] = 'a'
}
largeWrite := putBatch(largeAs, nil)
mediumWrite := putBatch(largeAs[:5], nil)

delRange := &kvpb.BatchRequest{}
delRange.Header.MaxSpanRequestKeys = 1
Expand Down Expand Up @@ -1854,17 +1856,6 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
},
{
// Request keys overlap, so they don't count twice.
name: "overlapping requests",
reqs: []*kvpb.BatchRequest{mediumWrite, mediumWrite, mediumWrite},
expRejectIdx: -1,
// Our estimation logic for rejecting requests based on size
// consults both the in-flight write set (which doesn't account for
// the span overhead) as well as the lock footprint (which accounts
// for the span overhead).
maxSize: 16 + roachpb.SpanOverhead,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down

0 comments on commit 629e830

Please sign in to comment.