Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: prep in-flight write tracking for replicated locks #121065

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type savepoint struct {
// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
// TODO(nvanbenschoten): this field is currently defined to be an exclusive
// lower bound, with the assumption that any writes performed after the
// savepoint is established will use a higher sequence number. This probably
// isn't working correctly with shared and exclusive lock acquisition, which
// don't increment the writeSeq. We should increment the writeSeq when a
// savepoint is established and then consider this an inclusive lower bound.
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
Expand Down
134 changes: 73 additions & 61 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"context"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -211,6 +212,10 @@ type txnPipeliner struct {
// to have succeeded. They will need to be proven before the transaction
// can commit.
ifWrites inFlightWriteSet
// The in-flight writes chain index is used to uniquely identify calls to
// chainToInFlightWrites, so that each call can limit itself to adding a
// single QueryIntent request to the batch per overlapping in-flight write.
ifWritesChainIndex int64
// The transaction's lock footprint contains spans where locks (replicated
// and unreplicated) have been acquired at some point by the transaction.
// The span set contains spans encompassing the keys from all intent writes
Expand Down Expand Up @@ -517,19 +522,25 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch
return ba
}

// We may need to add QueryIntent requests to the batch. These variables are
// used to implement a copy-on-write scheme.
forked := false
oldReqs := ba.Requests
// TODO(nvanbenschoten): go 1.11 includes an optimization to quickly clear
// out an entire map. That might make it cost effective to maintain a single
// chainedKeys map between calls to this function.
var chainedKeys map[string]struct{}

// We only want to add a single QueryIntent request to the BatchRequest per
// overlapping in-flight write. These counters allow us to accomplish this
// without a separate data structure.
tp.ifWritesChainIndex++
chainIndex := tp.ifWritesChainIndex
chainCount := 0

for i, ru := range oldReqs {
req := ru.GetInner()

// If we've chained onto all the in-flight writes (ifWrites.len() ==
// len(chainedKeys)), we don't need to pile on more QueryIntents. So, only
// chainCount), we don't need to pile on more QueryIntents. So, only
// do this work if that's not the case.
if tp.ifWrites.len() > len(chainedKeys) {
if tp.ifWrites.len() > chainCount {
// For each conflicting in-flight write, add a QueryIntent request
// to the batch to assert that it has succeeded and "chain" onto it.
writeIter := func(w *inFlightWrite) {
Expand All @@ -541,7 +552,7 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch
forked = true
}

if _, ok := chainedKeys[string(w.Key)]; !ok {
if w.chainIndex != chainIndex {
// The write has not already been chained onto by an earlier
// request in this batch. Add a QueryIntent request to the
// batch (before the conflicting request) to ensure that we
Expand All @@ -557,11 +568,12 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch
})

// Record that the key has been chained onto at least once
// in this batch so that we don't chain onto it again.
if chainedKeys == nil {
chainedKeys = make(map[string]struct{})
}
chainedKeys[string(w.Key)] = struct{}{}
// in this batch so that we don't chain onto it again. If
// we fail to prove the write exists for any reason, future
// requests will use a different chainIndex and will try to
// prove the write again.
w.chainIndex = chainIndex
chainCount++
}
}

Expand Down Expand Up @@ -910,11 +922,35 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool {
// number.
type inFlightWrite struct {
roachpb.SequencedWrite
// chainIndex is used to avoid chaining on to the same in-flight write
// multiple times in the same batch. Each index uniquely identifies a
// call to txnPipeliner.chainToInFlightWrites.
chainIndex int64
}

// makeInFlightWrite constructs an inFlightWrite.
func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite {
return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}}
}

// Less implements the btree.Item interface.
func (a *inFlightWrite) Less(b btree.Item) bool {
return a.Key.Compare(b.(*inFlightWrite).Key) < 0
//
// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites
// with the same Key but different Sequences are not considered equal and are
// maintained separately in the inFlightWritesSet.
func (a *inFlightWrite) Less(bItem btree.Item) bool {
b := bItem.(*inFlightWrite)
kCmp := a.Key.Compare(b.Key)
if kCmp != 0 {
// Different Keys.
return kCmp < 0
}
if a.Sequence != b.Sequence {
// Different Sequence.
return a.Sequence < b.Sequence
}
// Equal.
return false
}

// inFlightWriteSet is an ordered set of in-flight point writes. Given a set
Expand All @@ -931,60 +967,40 @@ type inFlightWriteSet struct {
}

// insert attempts to insert an in-flight write that has not been proven to have
// succeeded into the in-flight write set. If the write with an equal or larger
// sequence number already exists in the set, the method is a no-op.
// succeeded into the in-flight write set.
func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {
if s.t == nil {
// Lazily initialize btree.
s.t = btree.New(txnPipelinerBtreeDegree)
}

s.tmp1.Key = key
item := s.t.Get(&s.tmp1)
if item != nil {
otherW := item.(*inFlightWrite)
if seq > otherW.Sequence {
// Existing in-flight write has old information.
otherW.Sequence = seq
}
return
}

w := s.alloc.alloc(key, seq)
s.t.ReplaceOrInsert(w)
s.bytes += keySize(key)
delItem := s.t.ReplaceOrInsert(w)
if delItem != nil {
// An in-flight write with the same key and sequence already existed in the
// set. This is unexpected, but we handle it.
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
} else {
s.bytes += keySize(key)
}
}

// remove attempts to remove an in-flight write from the in-flight write set.
// The method will be a no-op if the write was already proved. Care is taken
// not to accidentally remove a write to the same key but at a later epoch or
// sequence number.
// The method will be a no-op if the write was already proved.
func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) {
if s.len() == 0 {
// Set is empty.
return
}

s.tmp1.Key = key
item := s.t.Get(&s.tmp1)
if item == nil {
// Delete the write from the in-flight writes set.
s.tmp1 = makeInFlightWrite(key, seq)
delItem := s.t.Delete(&s.tmp1)
if delItem == nil {
// The write was already proven or the txn epoch was incremented.
return
}

w := item.(*inFlightWrite)
if seq < w.Sequence {
// The sequence might have changed, which means that a new write was
// sent to the same key. This write would have been forced to prove
// the existence of current write already.
return
}

// Delete the write from the in-flight writes set.
delItem := s.t.Delete(item)
if delItem != nil {
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
}
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
s.bytes -= keySize(key)

// Assert that the byte accounting is believable.
Expand Down Expand Up @@ -1014,20 +1030,18 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh
// Set is empty.
return
}
s.tmp1 = makeInFlightWrite(start, 0)
if end == nil {
// Point lookup.
s.tmp1.Key = start
if i := s.t.Get(&s.tmp1); i != nil {
f(i.(*inFlightWrite))
}
s.tmp2 = makeInFlightWrite(start, math.MaxInt32)
} else {
// Range lookup.
s.tmp1.Key, s.tmp2.Key = start, end
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
return true
})
s.tmp2 = makeInFlightWrite(end, 0)
}
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
return true
})
}

// len returns the number of the in-flight writes in the set.
Expand Down Expand Up @@ -1091,9 +1105,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig

*a = (*a)[:len(*a)+1]
w := &(*a)[len(*a)-1]
*w = inFlightWrite{
SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq},
}
*w = makeInFlightWrite(key, seq)
return w
}

Expand Down
55 changes: 23 additions & 32 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
incArgs := kvpb.IncrementRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}}
incArgs.Sequence = 4
ba.Add(&incArgs)
// Write at the same key as another write in the same batch. Will only
// result in a single in-flight write, at the larger sequence number.
// Write at the same key as another write in the same batch. Will result in
// two separate in-flight writes.
delArgs := kvpb.DeleteRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}}
delArgs.Sequence = 5
ba.Add(&delArgs)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.IsType(t, &kvpb.IncrementResponse{}, br.Responses[2].GetInner())
require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[3].GetInner())
require.Nil(t, pErr)
require.Equal(t, 3, tp.ifWrites.len())
require.Equal(t, 4, tp.ifWrites.len())

wMin := tp.ifWrites.t.Min().(*inFlightWrite)
require.Equal(t, cputArgs.Key, wMin.Key)
Expand All @@ -277,29 +277,34 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
ba.Add(&etArgs)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 5)
require.Len(t, ba.Requests, 6)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner())
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[4].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[4].GetInner())
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[5].GetInner())

qiReq1 := ba.Requests[1].GetQueryIntent()
qiReq2 := ba.Requests[2].GetQueryIntent()
qiReq3 := ba.Requests[3].GetQueryIntent()
qiReq4 := ba.Requests[4].GetQueryIntent()
require.Equal(t, keyA, qiReq1.Key)
require.Equal(t, keyB, qiReq2.Key)
require.Equal(t, keyC, qiReq3.Key)
require.Equal(t, keyC, qiReq4.Key)
require.Equal(t, enginepb.TxnSeq(2), qiReq1.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(3), qiReq2.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(5), qiReq3.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(4), qiReq3.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(5), qiReq4.Txn.Sequence)

etReq := ba.Requests[4].GetEndTxn()
etReq := ba.Requests[5].GetEndTxn()
require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans)
expInFlight := []roachpb.SequencedWrite{
{Key: keyA, Sequence: 2},
{Key: keyB, Sequence: 3},
{Key: keyC, Sequence: 4},
{Key: keyC, Sequence: 5},
{Key: keyD, Sequence: 6},
}
Expand All @@ -311,13 +316,15 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
// NOTE: expected response from a v23.1 node.
// TODO(nvanbenschoten): update this case when v23.1 compatibility is no
// longer required.
br.Responses[2].GetQueryIntent().FoundIntent = false
br.Responses[1].GetQueryIntent().FoundIntent = false
br.Responses[1].GetQueryIntent().FoundUnpushedIntent = true
// NOTE: expected responses from a v23.2 node.
br.Responses[2].GetQueryIntent().FoundIntent = true
br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true
br.Responses[3].GetQueryIntent().FoundIntent = true
br.Responses[2].GetQueryIntent().FoundUnpushedIntent = false
br.Responses[3].GetQueryIntent().FoundUnpushedIntent = false
br.Responses[4].GetQueryIntent().FoundIntent = true
br.Responses[4].GetQueryIntent().FoundUnpushedIntent = false
return br, nil
})

Expand Down Expand Up @@ -1647,11 +1654,10 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
s := savepoint{seqNum: enginepb.TxnSeq(12), active: true}
tp.createSavepointLocked(ctx, &s)

// Some more writes after the savepoint. One of them is on key "c" that is
// part of the savepoint too, so we'll check that, upon rollback, the savepoint is
// updated to remove the lower-seq-num write to "c" that it was tracking as in-flight.
// Some more writes after the savepoint.
tp.ifWrites.insert(roachpb.Key("c"), 13)
tp.ifWrites.insert(roachpb.Key("d"), 14)
require.Equal(t, 5, tp.ifWrites.len())
require.Empty(t, tp.lockFootprint.asSlice())

// Now verify one of the writes. When we'll rollback to the savepoint below,
Expand Down Expand Up @@ -1679,31 +1685,28 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{{Key: roachpb.Key("a")}}, tp.lockFootprint.asSlice())
require.Equal(t, 3, tp.ifWrites.len()) // We've verified one out of 4 writes.
require.Equal(t, 4, tp.ifWrites.len()) // We've verified one out of 5 writes.

// Now restore the savepoint and check that the in-flight write state has been restored
// and all rolled-back writes were moved to the lock footprint.
tp.rollbackToSavepointLocked(ctx, s)

// Check that the tracked inflight writes were updated correctly. The key that
// had been verified ("a") should have been taken out of the savepoint. Same
// for the "c", for which the pipeliner is now tracking a
// higher-sequence-number (which implies that it must have verified the lower
// sequence number write).
// had been verified ("a") should have been taken out of the savepoint.
var ifWrites []inFlightWrite
tp.ifWrites.ascend(func(w *inFlightWrite) {
ifWrites = append(ifWrites, *w)
})
require.Equal(t,
[]inFlightWrite{
{roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}},
{SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}},
{SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12}},
},
ifWrites)

// Check that the footprint was updated correctly. In addition to the "a"
// which it had before, it will also have "d" because it's not part of the
// savepoint. It will also have "c" since that's not an in-flight write any
// more (see above).
// savepoint. It will also have "c".
require.Equal(t,
[]roachpb.Span{
{Key: roachpb.Key("a")},
Expand Down Expand Up @@ -1905,7 +1908,6 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
largeAs[i] = 'a'
}
largeWrite := putBatch(largeAs, nil)
mediumWrite := putBatch(largeAs[:5], nil)

lockingScanRequest := &kvpb.BatchRequest{}
lockingScanRequest.Header.MaxSpanRequestKeys = 1
Expand Down Expand Up @@ -2015,17 +2017,6 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
},
{
// Request keys overlap, so they don't count twice.
name: "overlapping requests",
reqs: []*kvpb.BatchRequest{mediumWrite, mediumWrite, mediumWrite},
expRejectIdx: -1,
// Our estimation logic for rejecting requests based on size
// consults both the in-flight write set (which doesn't account for
// the span overhead) as well as the lock footprint (which accounts
// for the span overhead).
maxSize: 16 + roachpb.SpanOverhead,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
Loading