diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index af84ff920636..cc87c678a323 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -40,6 +40,12 @@ type savepoint struct { // seqNum represents the write seq num at the time the savepoint was created. // On rollback, it configures the txn to ignore all seqnums from this value // until the most recent seqnum. + // TODO(nvanbenschoten): this field is currently defined to be an exclusive + // lower bound, with the assumption that any writes performed after the + // savepoint is established will use a higher sequence number. This probably + // isn't working correctly with shared and exclusive lock acquisition, which + // don't increment the writeSeq. We should increment the writeSeq when a + // savepoint is established and then consider this an inclusive lower bound. seqNum enginepb.TxnSeq // txnSpanRefresher fields. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index 79a230ffaab7..8e386ab0bdf6 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -12,6 +12,7 @@ package kvcoord import ( "context" + "math" "sort" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -211,6 +212,10 @@ type txnPipeliner struct { // to have succeeded. They will need to be proven before the transaction // can commit. ifWrites inFlightWriteSet + // The in-flight writes chain index is used to uniquely identify calls to + // chainToInFlightWrites, so that each call can limit itself to adding a + // single QueryIntent request to the batch per overlapping in-flight write. + ifWritesChainIndex int64 // The transaction's lock footprint contains spans where locks (replicated // and unreplicated) have been acquired at some point by the transaction. // The span set contains spans encompassing the keys from all intent writes @@ -517,19 +522,25 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch return ba } + // We may need to add QueryIntent requests to the batch. These variables are + // used to implement a copy-on-write scheme. forked := false oldReqs := ba.Requests - // TODO(nvanbenschoten): go 1.11 includes an optimization to quickly clear - // out an entire map. That might make it cost effective to maintain a single - // chainedKeys map between calls to this function. - var chainedKeys map[string]struct{} + + // We only want to add a single QueryIntent request to the BatchRequest per + // overlapping in-flight write. These counters allow us to accomplish this + // without a separate data structure. + tp.ifWritesChainIndex++ + chainIndex := tp.ifWritesChainIndex + chainCount := 0 + for i, ru := range oldReqs { req := ru.GetInner() // If we've chained onto all the in-flight writes (ifWrites.len() == - // len(chainedKeys)), we don't need to pile on more QueryIntents. So, only + // chainCount), we don't need to pile on more QueryIntents. So, only // do this work if that's not the case. - if tp.ifWrites.len() > len(chainedKeys) { + if tp.ifWrites.len() > chainCount { // For each conflicting in-flight write, add a QueryIntent request // to the batch to assert that it has succeeded and "chain" onto it. writeIter := func(w *inFlightWrite) { @@ -541,7 +552,7 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch forked = true } - if _, ok := chainedKeys[string(w.Key)]; !ok { + if w.chainIndex != chainIndex { // The write has not already been chained onto by an earlier // request in this batch. Add a QueryIntent request to the // batch (before the conflicting request) to ensure that we @@ -557,11 +568,12 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch }) // Record that the key has been chained onto at least once - // in this batch so that we don't chain onto it again. - if chainedKeys == nil { - chainedKeys = make(map[string]struct{}) - } - chainedKeys[string(w.Key)] = struct{}{} + // in this batch so that we don't chain onto it again. If + // we fail to prove the write exists for any reason, future + // requests will use a different chainIndex and will try to + // prove the write again. + w.chainIndex = chainIndex + chainCount++ } } @@ -910,11 +922,35 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool { // number. type inFlightWrite struct { roachpb.SequencedWrite + // chainIndex is used to avoid chaining on to the same in-flight write + // multiple times in the same batch. Each index uniquely identifies a + // call to txnPipeliner.chainToInFlightWrites. + chainIndex int64 +} + +// makeInFlightWrite constructs an inFlightWrite. +func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite { + return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}} } // Less implements the btree.Item interface. -func (a *inFlightWrite) Less(b btree.Item) bool { - return a.Key.Compare(b.(*inFlightWrite).Key) < 0 +// +// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites +// with the same Key but different Sequences are not considered equal and are +// maintained separately in the inFlightWritesSet. +func (a *inFlightWrite) Less(bItem btree.Item) bool { + b := bItem.(*inFlightWrite) + kCmp := a.Key.Compare(b.Key) + if kCmp != 0 { + // Different Keys. + return kCmp < 0 + } + if a.Sequence != b.Sequence { + // Different Sequence. + return a.Sequence < b.Sequence + } + // Equal. + return false } // inFlightWriteSet is an ordered set of in-flight point writes. Given a set @@ -931,60 +967,40 @@ type inFlightWriteSet struct { } // insert attempts to insert an in-flight write that has not been proven to have -// succeeded into the in-flight write set. If the write with an equal or larger -// sequence number already exists in the set, the method is a no-op. +// succeeded into the in-flight write set. func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) { if s.t == nil { // Lazily initialize btree. s.t = btree.New(txnPipelinerBtreeDegree) } - s.tmp1.Key = key - item := s.t.Get(&s.tmp1) - if item != nil { - otherW := item.(*inFlightWrite) - if seq > otherW.Sequence { - // Existing in-flight write has old information. - otherW.Sequence = seq - } - return - } - w := s.alloc.alloc(key, seq) - s.t.ReplaceOrInsert(w) - s.bytes += keySize(key) + delItem := s.t.ReplaceOrInsert(w) + if delItem != nil { + // An in-flight write with the same key and sequence already existed in the + // set. This is unexpected, but we handle it. + *delItem.(*inFlightWrite) = inFlightWrite{} // for GC + } else { + s.bytes += keySize(key) + } } // remove attempts to remove an in-flight write from the in-flight write set. -// The method will be a no-op if the write was already proved. Care is taken -// not to accidentally remove a write to the same key but at a later epoch or -// sequence number. +// The method will be a no-op if the write was already proved. func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) { if s.len() == 0 { // Set is empty. return } - s.tmp1.Key = key - item := s.t.Get(&s.tmp1) - if item == nil { + // Delete the write from the in-flight writes set. + s.tmp1 = makeInFlightWrite(key, seq) + delItem := s.t.Delete(&s.tmp1) + if delItem == nil { // The write was already proven or the txn epoch was incremented. return } - - w := item.(*inFlightWrite) - if seq < w.Sequence { - // The sequence might have changed, which means that a new write was - // sent to the same key. This write would have been forced to prove - // the existence of current write already. - return - } - - // Delete the write from the in-flight writes set. - delItem := s.t.Delete(item) - if delItem != nil { - *delItem.(*inFlightWrite) = inFlightWrite{} // for GC - } + *delItem.(*inFlightWrite) = inFlightWrite{} // for GC s.bytes -= keySize(key) // Assert that the byte accounting is believable. @@ -1014,20 +1030,18 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh // Set is empty. return } + s.tmp1 = makeInFlightWrite(start, 0) if end == nil { // Point lookup. - s.tmp1.Key = start - if i := s.t.Get(&s.tmp1); i != nil { - f(i.(*inFlightWrite)) - } + s.tmp2 = makeInFlightWrite(start, math.MaxInt32) } else { // Range lookup. - s.tmp1.Key, s.tmp2.Key = start, end - s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool { - f(i.(*inFlightWrite)) - return true - }) + s.tmp2 = makeInFlightWrite(end, 0) } + s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool { + f(i.(*inFlightWrite)) + return true + }) } // len returns the number of the in-flight writes in the set. @@ -1091,9 +1105,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig *a = (*a)[:len(*a)+1] w := &(*a)[len(*a)-1] - *w = inFlightWrite{ - SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}, - } + *w = makeInFlightWrite(key, seq) return w } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 1d2f2c8f74c8..d1409e006c47 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -217,8 +217,8 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { incArgs := kvpb.IncrementRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}} incArgs.Sequence = 4 ba.Add(&incArgs) - // Write at the same key as another write in the same batch. Will only - // result in a single in-flight write, at the larger sequence number. + // Write at the same key as another write in the same batch. Will result in + // two separate in-flight writes. delArgs := kvpb.DeleteRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}} delArgs.Sequence = 5 ba.Add(&delArgs) @@ -256,7 +256,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { require.IsType(t, &kvpb.IncrementResponse{}, br.Responses[2].GetInner()) require.IsType(t, &kvpb.DeleteResponse{}, br.Responses[3].GetInner()) require.Nil(t, pErr) - require.Equal(t, 3, tp.ifWrites.len()) + require.Equal(t, 4, tp.ifWrites.len()) wMin := tp.ifWrites.t.Min().(*inFlightWrite) require.Equal(t, cputArgs.Key, wMin.Key) @@ -277,29 +277,34 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { ba.Add(&etArgs) mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { - require.Len(t, ba.Requests, 5) + require.Len(t, ba.Requests, 6) require.False(t, ba.AsyncConsensus) require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner()) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner()) - require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[4].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[4].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[5].GetInner()) qiReq1 := ba.Requests[1].GetQueryIntent() qiReq2 := ba.Requests[2].GetQueryIntent() qiReq3 := ba.Requests[3].GetQueryIntent() + qiReq4 := ba.Requests[4].GetQueryIntent() require.Equal(t, keyA, qiReq1.Key) require.Equal(t, keyB, qiReq2.Key) require.Equal(t, keyC, qiReq3.Key) + require.Equal(t, keyC, qiReq4.Key) require.Equal(t, enginepb.TxnSeq(2), qiReq1.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(3), qiReq2.Txn.Sequence) - require.Equal(t, enginepb.TxnSeq(5), qiReq3.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(4), qiReq3.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(5), qiReq4.Txn.Sequence) - etReq := ba.Requests[4].GetEndTxn() + etReq := ba.Requests[5].GetEndTxn() require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans) expInFlight := []roachpb.SequencedWrite{ {Key: keyA, Sequence: 2}, {Key: keyB, Sequence: 3}, + {Key: keyC, Sequence: 4}, {Key: keyC, Sequence: 5}, {Key: keyD, Sequence: 6}, } @@ -311,13 +316,15 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { // NOTE: expected response from a v23.1 node. // TODO(nvanbenschoten): update this case when v23.1 compatibility is no // longer required. - br.Responses[2].GetQueryIntent().FoundIntent = false + br.Responses[1].GetQueryIntent().FoundIntent = false br.Responses[1].GetQueryIntent().FoundUnpushedIntent = true // NOTE: expected responses from a v23.2 node. br.Responses[2].GetQueryIntent().FoundIntent = true br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true br.Responses[3].GetQueryIntent().FoundIntent = true - br.Responses[2].GetQueryIntent().FoundUnpushedIntent = false + br.Responses[3].GetQueryIntent().FoundUnpushedIntent = false + br.Responses[4].GetQueryIntent().FoundIntent = true + br.Responses[4].GetQueryIntent().FoundUnpushedIntent = false return br, nil }) @@ -1647,11 +1654,10 @@ func TestTxnPipelinerSavepoints(t *testing.T) { s := savepoint{seqNum: enginepb.TxnSeq(12), active: true} tp.createSavepointLocked(ctx, &s) - // Some more writes after the savepoint. One of them is on key "c" that is - // part of the savepoint too, so we'll check that, upon rollback, the savepoint is - // updated to remove the lower-seq-num write to "c" that it was tracking as in-flight. + // Some more writes after the savepoint. tp.ifWrites.insert(roachpb.Key("c"), 13) tp.ifWrites.insert(roachpb.Key("d"), 14) + require.Equal(t, 5, tp.ifWrites.len()) require.Empty(t, tp.lockFootprint.asSlice()) // Now verify one of the writes. When we'll rollback to the savepoint below, @@ -1679,31 +1685,28 @@ func TestTxnPipelinerSavepoints(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) require.Equal(t, []roachpb.Span{{Key: roachpb.Key("a")}}, tp.lockFootprint.asSlice()) - require.Equal(t, 3, tp.ifWrites.len()) // We've verified one out of 4 writes. + require.Equal(t, 4, tp.ifWrites.len()) // We've verified one out of 5 writes. // Now restore the savepoint and check that the in-flight write state has been restored // and all rolled-back writes were moved to the lock footprint. tp.rollbackToSavepointLocked(ctx, s) // Check that the tracked inflight writes were updated correctly. The key that - // had been verified ("a") should have been taken out of the savepoint. Same - // for the "c", for which the pipeliner is now tracking a - // higher-sequence-number (which implies that it must have verified the lower - // sequence number write). + // had been verified ("a") should have been taken out of the savepoint. var ifWrites []inFlightWrite tp.ifWrites.ascend(func(w *inFlightWrite) { ifWrites = append(ifWrites, *w) }) require.Equal(t, []inFlightWrite{ - {roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}}, + {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}}, + {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12}}, }, ifWrites) // Check that the footprint was updated correctly. In addition to the "a" // which it had before, it will also have "d" because it's not part of the - // savepoint. It will also have "c" since that's not an in-flight write any - // more (see above). + // savepoint. It will also have "c". require.Equal(t, []roachpb.Span{ {Key: roachpb.Key("a")}, @@ -1905,7 +1908,6 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { largeAs[i] = 'a' } largeWrite := putBatch(largeAs, nil) - mediumWrite := putBatch(largeAs[:5], nil) lockingScanRequest := &kvpb.BatchRequest{} lockingScanRequest.Header.MaxSpanRequestKeys = 1 @@ -2015,17 +2017,6 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { expRejectIdx: -1, maxSize: 10 + roachpb.SpanOverhead, }, - { - // Request keys overlap, so they don't count twice. - name: "overlapping requests", - reqs: []*kvpb.BatchRequest{mediumWrite, mediumWrite, mediumWrite}, - expRejectIdx: -1, - // Our estimation logic for rejecting requests based on size - // consults both the in-flight write set (which doesn't account for - // the span overhead) as well as the lock footprint (which accounts - // for the span overhead). - maxSize: 16 + roachpb.SpanOverhead, - }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) {