From ce93193004b508c2f7821fdde5466bb9f91f39f3 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Tue, 5 Mar 2024 18:27:02 -0500 Subject: [PATCH] kv: allow DeleteRangeRequests to be pipelined Previously, ranged requests could not be pipelined. However, there is no good reason to not allow them to be pipeliend -- we just have to take extra care to correctly update in-flight writes tracking on the response path. We do so now. As part of this patch, we introduce two new flags -- canPipeline and canParallelCommit. We use these flags to determine whether batches can be pipelined or committed using parallel commits. This is in contrast to before, where we derived this information from other flags (isIntentWrite, !isRange). This wasn't strictly necessary for this change, but helps clean up the concepts. As a consequence of this change, we now have a distinction between requests that can be pipelined and requests that can be part of a batch that can be committed in parallel. Notably, this applies to DeleteRangeRequests -- they can be pipeliend, but not be committed in parallel. That's because we need to have the entire write set upfront when performing a parallel commit, lest we need to perform recovery -- we don't have this for DeleteRange requests. In the future, we'll extend the concept of canPipeline (and !canParallelCommit) to other locking ranged requests as well. In particular, (replicated) locking {,Reverse}ScanRequests who want to pipeline their lock acquisitions. Closes #64723 Informs #117978 Release note: None --- .../kvcoord/dist_sender_server_test.go | 6 + .../kvclient/kvcoord/txn_coord_sender_test.go | 181 ++++++++++++++--- .../kvcoord/txn_interceptor_committer.go | 25 +-- .../kvcoord/txn_interceptor_committer_test.go | 2 +- .../kvcoord/txn_interceptor_pipeliner.go | 48 +++-- .../kvcoord/txn_interceptor_pipeliner_test.go | 189 ++++++++++++++++-- pkg/kv/kvpb/api.go | 62 +++++- pkg/kv/kvserver/replica_batch_updates.go | 8 +- pkg/server/intent_test.go | 10 +- 9 files changed, 441 insertions(+), 90 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index cbfabd0fff64..c5f1878a3b08 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2228,6 +2228,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, { name: "forwarded timestamp with delete range", + beforeTxnStart: func(ctx context.Context, db *kv.DB) error { + return db.Put(ctx, "a", "put") // ensure DeleteRange is not a no-op + }, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // read key to set ts cache return err @@ -2552,6 +2555,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { name: "forwarded timestamp with too many refreshes in batch commit " + "with refresh", refreshSpansCondenseFilter: disableCondensingRefreshSpans, + beforeTxnStart: func(ctx context.Context, db *kv.DB) error { + return db.Put(ctx, "a", "put") // ensure DeleteRange is not a no-op + }, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index fff099a1466e..77f47d0e9762 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -1166,14 +1166,19 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) { b.GetForUpdate(roachpb.Key("n"), kvpb.GuaranteedDurability) b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z"), kvpb.GuaranteedDurability) - // The expected locks are a-b, c, m, n, and u-z. + // The expected locks are a-b, c, m, n, and v-z. + // + // A note about the v-z span -- because the DeleteRange request did not + // actually delete any keys, we'll not track anything for it in the lock + // footprint for this transaction. The v-z range comes from the + // ReverseScanForShare request in the final batch. expectedLockSpans = []roachpb.Span{ {Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()}, {Key: roachpb.Key("c"), EndKey: nil}, {Key: roachpb.Key("d"), EndKey: nil}, {Key: roachpb.Key("m"), EndKey: nil}, {Key: roachpb.Key("n"), EndKey: nil}, - {Key: roachpb.Key("u"), EndKey: roachpb.Key("z")}, + {Key: roachpb.Key("v"), EndKey: roachpb.Key("z")}, } pErr = txn.CommitInBatch(ctx, b) @@ -1986,6 +1991,15 @@ func TestCommitMutatingTransaction(t *testing.T) { if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) { t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key) } + + if _, ok := ba.GetArg(kvpb.DeleteRange); ok { + // Simulate deleting a single key for DeleteRange requests. Unlike other + // point writes, pipelined DeleteRange writes are tracked by looking at + // the batch response. + resp := br.Responses[0].GetInner() + resp.(*kvpb.DeleteRangeResponse).Keys = []roachpb.Key{roachpb.Key("a")} + } + if et, ok := ba.GetArg(kvpb.EndTxn); ok { if !et.(*kvpb.EndTxnRequest).Commit { t.Errorf("expected commit to be true") @@ -2009,61 +2023,178 @@ func TestCommitMutatingTransaction(t *testing.T) { testArgs := []struct { f func(ctx context.Context, txn *kv.Txn) error expMethod kvpb.Method - // pointWrite is set if the method is a "point write", which means that it - // will be pipelined and we should expect a QueryIntent request at commit - // time. - pointWrite bool + // All retryable functions below involve writing to exactly one key. The + // write will be pipelined if it's not in the same batch as the + // EndTxnRequest, in which case we expect a single QueryIntent request to be + // added when trying to commit the transaction. + expQueryIntent bool }{ { - f: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "b") }, - expMethod: kvpb.Put, - pointWrite: true, + f: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "b") }, + expMethod: kvpb.Put, + expQueryIntent: true, + }, + { + f: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.Put("a", "b") + return txn.CommitInBatch(ctx, b) + }, + expMethod: kvpb.Put, }, { - f: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "b", nil) }, - expMethod: kvpb.ConditionalPut, - pointWrite: true, + f: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "b", nil) }, + expMethod: kvpb.ConditionalPut, + expQueryIntent: true, + }, + { + f: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.CPut("a", "b", nil) + return txn.CommitInBatch(ctx, b) + }, + expMethod: kvpb.ConditionalPut, }, { f: func(ctx context.Context, txn *kv.Txn) error { _, err := txn.Inc(ctx, "a", 1) return err }, - expMethod: kvpb.Increment, - pointWrite: true, + expMethod: kvpb.Increment, + expQueryIntent: true, + }, + { + f: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.Inc("a", 1) + return txn.CommitInBatch(ctx, b) + }, + expMethod: kvpb.Increment, }, { f: func(ctx context.Context, txn *kv.Txn) error { _, err := txn.Del(ctx, "a") return err }, - expMethod: kvpb.Delete, - pointWrite: true, + expMethod: kvpb.Delete, + expQueryIntent: true, + }, + { + f: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.Del("a") + return txn.CommitInBatch(ctx, b) + }, + expMethod: kvpb.Delete, }, { f: func(ctx context.Context, txn *kv.Txn) error { _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, - expMethod: kvpb.DeleteRange, - pointWrite: false, + expMethod: kvpb.DeleteRange, + expQueryIntent: true, + }, + { + f: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.DelRange("a", "b", false /* returnKeys */) + return txn.CommitInBatch(ctx, b) + }, + expMethod: kvpb.DeleteRange, + expQueryIntent: false, }, } - for i, test := range testArgs { + for _, test := range testArgs { t.Run(test.expMethod.String(), func(t *testing.T) { calls = nil db := kv.NewDB(log.MakeTestingAmbientCtxWithNewTracer(), factory, clock, stopper) - if err := db.Txn(ctx, test.f); err != nil { - t.Fatalf("%d: unexpected error on commit: %s", i, err) - } + require.NoError(t, db.Txn(ctx, test.f)) expectedCalls := []kvpb.Method{test.expMethod} - if test.pointWrite { + if test.expQueryIntent { expectedCalls = append(expectedCalls, kvpb.QueryIntent) } expectedCalls = append(expectedCalls, kvpb.EndTxn) - if !reflect.DeepEqual(expectedCalls, calls) { - t.Fatalf("%d: expected %s, got %s", i, expectedCalls, calls) + require.Equal(t, expectedCalls, calls) + }) + } +} + +// TestCommitNoopDeleteRangeTransaction ensures that committing a no-op +// DeleteRange transaction works correctly. In particular, the EndTxn request is +// elided, if possible. +func TestCommitNoopDeleteRangeTransaction(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + clock := hlc.NewClockForTesting(nil) + ambient := log.MakeTestingAmbientCtxWithNewTracer() + sender := &mockSender{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + var calls []kvpb.Method + sender.match(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + + calls = append(calls, ba.Methods()...) + if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) { + t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key) + } + if et, ok := ba.GetArg(kvpb.EndTxn); ok { + if !et.(*kvpb.EndTxnRequest).Commit { + t.Errorf("expected commit to be true") + } + br.Txn.Status = roachpb.COMMITTED + } + + // Don't return any keys in the DeleteRange response, which simulates what a + // no-op DeleteRange looks like from the client's perspective. + return br, nil + }) + + factory := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + Clock: clock, + Stopper: stopper, + Settings: cluster.MakeTestingClusterSettings(), + }, + sender, + ) + + testCases := []struct { + f func(ctx context.Context, txn *kv.Txn) error + elideEndTxn bool + }{ + { + f: func(ctx context.Context, txn *kv.Txn) error { + _, err := txn.DelRange(ctx, "a", "d", true /* returnKeys */) + return err + }, + elideEndTxn: true, + }, + { + f: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.DelRange("a", "b", true /* returnKeys */) + return txn.CommitInBatch(ctx, b) + }, + elideEndTxn: false, + }, + } + + for i, test := range testCases { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + calls = nil + db := kv.NewDB(log.MakeTestingAmbientCtxWithNewTracer(), factory, clock, stopper) + require.NoError(t, db.Txn(ctx, test.f)) + expectedCalls := []kvpb.Method{kvpb.DeleteRange} + if !test.elideEndTxn { + expectedCalls = append(expectedCalls, kvpb.EndTxn) } + require.Equal(t, expectedCalls, calls) }) } } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 6ab5d8b26b50..282fc8587d08 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -153,7 +153,6 @@ func (tc *txnCommitter) SendLocked( if err := tc.validateEndTxnBatch(ba); err != nil { return nil, kvpb.NewError(err) } - // Determine whether we can elide the EndTxn entirely. We can do so if the // transaction is read-only, which we determine based on whether the EndTxn // request contains any writes. @@ -367,31 +366,21 @@ func (tc *txnCommitter) canCommitInParallel(ba *kvpb.BatchRequest, et *kvpb.EndT for _, ru := range ba.Requests[:len(ba.Requests)-1] { req := ru.GetInner() switch { - case kvpb.IsIntentWrite(req): - if kvpb.IsRange(req) { - // Similar to how we can't pipeline ranged writes, we also can't - // commit in parallel with them. The reason for this is that the - // status resolution process for STAGING transactions wouldn't - // know where to look for the corresponding intents. - return false - } - // All other point writes are included in the EndTxn request's - // InFlightWrites set and are visible to the status resolution - // process for STAGING transactions. Populating InFlightWrites - // has already been done by the txnPipeliner. + case kvpb.CanParallelCommit(req): + // The request can be part of a batch that is committed in parallel. case req.Method() == kvpb.QueryIntent: - // QueryIntent requests are compatable with parallel commits. The + // QueryIntent requests are compatible with parallel commits. The // intents being queried are also attached to the EndTxn request's // InFlightWrites set and are visible to the status resolution // process for STAGING transactions. Populating InFlightWrites has // already been done by the txnPipeliner. default: - // All other request types, notably Get and Scan requests, are - // incompatible with parallel commits because their outcome is - // not taken into consideration by the status resolution process - // for STAGING transactions. + // All other request types, notably Get, Scan and DeleteRange requests, + // are incompatible with parallel commits because their outcome is not + // taken into consideration by the status resolution process for STAGING + // transactions. return false } } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index b388777b7dbc..8065fda16ce5 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -72,7 +72,7 @@ func TestTxnCommitterElideEndTxn(t *testing.T) { mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 2) require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner()) - require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner()) br := ba.CreateReply() br.Txn = ba.Txn diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index acb9878e43fc..79a230ffaab7 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -459,18 +459,31 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch for _, ru := range ba.Requests { req := ru.GetInner() - // Determine whether the current request prevents us from performing async - // consensus on the batch. - if !kvpb.IsIntentWrite(req) || kvpb.IsRange(req) { - // Only allow batches consisting of solely transactional point - // writes to perform consensus asynchronously. - // TODO(nvanbenschoten): We could allow batches with reads and point - // writes to perform async consensus, but this would be a bit - // tricky. Any read would need to chain on to any write that came - // before it in the batch and overlaps. For now, it doesn't seem - // worth it. + if req.Method() == kvpb.DeleteRange { + // Special handling for DeleteRangeRequests. + deleteRangeReq := req.(*kvpb.DeleteRangeRequest) + // We'll need the list of keys deleted to verify whether replication + // succeeded or not. Override ReturnKeys. + // + // NB: This means we'll return keys to the client even if it explicitly + // set this to false. If this proves to be a problem in practice, we can + // always add some tracking here and strip the response. Alternatively, we + // can disable DeleteRange pipelining entirely for requests that set this + // field to false. + // + // TODO(arul): Get rid of this flag entirely and always treat it as true. + // Now that we're overriding ReturnKeys here, the number of cases where + // this will be false are very few -- it's only when DeleteRange is part + // of the same batch as an EndTxn request. + deleteRangeReq.ReturnKeys = true + } + + if !kvpb.CanPipeline(req) { + // The current request cannot be pipelined, so it prevents us from + // performing async consensus on the batch. return false } + // Inhibit async consensus if the batch would push us over the maximum // tracking memory budget. If we allowed async consensus on this batch, its // writes would need to be tracked precisely. By inhibiting async consensus, @@ -730,12 +743,17 @@ func (tp *txnPipeliner) updateLockTrackingInner( // Record any writes that were performed asynchronously. We'll // need to prove that these succeeded sometime before we commit. header := req.Header() - tp.ifWrites.insert(header.Key, header.Sequence) - // The request is not expected to be a ranged one, as we're only - // tracking one key in the ifWrites. Ranged requests do not admit - // ba.AsyncConsensus. if kvpb.IsRange(req) { - log.Fatalf(ctx, "unexpected range request with AsyncConsensus: %s", req) + switch req.Method() { + case kvpb.DeleteRange: + for _, key := range resp.(*kvpb.DeleteRangeResponse).Keys { + tp.ifWrites.insert(key, header.Sequence) + } + default: + log.Fatalf(ctx, "unexpected ranged request with AsyncConsensus: %s", req) + } + } else { + tp.ifWrites.insert(header.Key, header.Sequence) } } else { // If the lock acquisitions weren't performed asynchronously diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 5a84659960cd..1d2f2c8f74c8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -522,9 +522,10 @@ func TestTxnPipelinerReads(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) } -// TestTxnPipelinerRangedWrites tests that txnPipeliner will never perform -// ranged write operations using async consensus. It also tests that ranged -// writes will correctly chain on to existing in-flight writes. +// TestTxnPipelinerRangedWrites tests that the txnPipeliner can perform some +// ranged write operations using async consensus. In particular, ranged requests +// which have the canPipeline flag set. It also verifies that ranged writes will +// correctly chain on to existing in-flight writes. func TestTxnPipelinerRangedWrites(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -534,6 +535,7 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { txn := makeTxnProto() keyA, keyD := roachpb.Key("a"), roachpb.Key("d") + // First, test DeleteRangeRequests which can be pipelined. ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: &txn} ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}) @@ -541,7 +543,7 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 2) - require.False(t, ba.AsyncConsensus) + require.True(t, ba.AsyncConsensus) require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[1].GetInner()) @@ -553,8 +555,12 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { br, pErr := tp.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - // The PutRequest was not run asynchronously, so it is not outstanding. - require.Equal(t, 0, tp.ifWrites.len()) + // The PutRequest was run asynchronously, so it has outstanding writes. + require.Equal(t, 1, tp.ifWrites.len()) + + // Clear outstanding write added by the put request from the set of inflight + // writes before inserting new writes below. + tp.ifWrites.clear(true /* reuse */) // Add five keys into the in-flight writes set, one of which overlaps with // the Put request and two others which also overlap with the DeleteRange @@ -570,7 +576,7 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 5) - require.False(t, ba.AsyncConsensus) + require.True(t, ba.AsyncConsensus) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner()) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) @@ -604,6 +610,33 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { br, pErr = tp.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) + // The put will be added to ifWrites, and the 2 from before that weren't + // covered by QueryIntent requests. + require.Equal(t, 3, tp.ifWrites.len()) + + // Now, test a non-locking Scan request, which cannot be pipelined. The scan + // overlaps with one of the keys in the in-flight write set, so we expect it + // to be chained on to the request. + ba = &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyD}}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetQueryIntent().FoundIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + // The 2 from before that weren't covered by QueryIntent requests. require.Equal(t, 2, tp.ifWrites.len()) } @@ -913,7 +946,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) { t.Run(fmt.Sprintf("errIdx=%d", errIdx), func(t *testing.T) { mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 7) - require.False(t, ba.AsyncConsensus) + require.True(t, ba.AsyncConsensus) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner()) require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) @@ -939,6 +972,98 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) { } } +// TestTxnPipelinerDeleteRangeRequests ensures the txnPipeliner correctly +// decides whether to pipeline DeleteRangeRequests. In particular, it ensures +// DeleteRangeRequests can only be pipelined iff the batch doesn't contain an +// EndTxn request. +func TestTxnPipelinerDeleteRangeRequests(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) + + txn := makeTxnProto() + keyA, keyB, keyC, keyD, keyE := roachpb.Key("a"), + roachpb.Key("b"), roachpb.Key("c"), roachpb.Key("d"), roachpb.Key("e") + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.DeleteRangeRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyD, Sequence: 7}, ReturnKeys: false}, + ) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner()) + // The pipeliner should have overriden the ReturnKeys flag. + require.True(t, ba.Requests[0].GetInner().(*kvpb.DeleteRangeRequest).ReturnKeys) + + br := ba.CreateReply() + br.Txn = ba.Txn + resp := br.Responses[0].GetInner() + resp.(*kvpb.DeleteRangeResponse).Keys = []roachpb.Key{keyA, keyB, keyD} + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 3, tp.ifWrites.len()) + + // Now, create a batch which has (another) DeleteRangRequest and an + // EndTxnRequest as well. + ba = &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyC, EndKey: keyE}}) + ba.Add(&kvpb.EndTxnRequest{Commit: true}) + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 5) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[4].GetInner()) + + qiReq1 := ba.Requests[0].GetQueryIntent() + qiReq2 := ba.Requests[2].GetQueryIntent() + qiReq3 := ba.Requests[3].GetQueryIntent() + require.Equal(t, keyD, qiReq1.Key) + require.Equal(t, keyA, qiReq2.Key) + require.Equal(t, keyB, qiReq3.Key) + require.Equal(t, enginepb.TxnSeq(7), qiReq1.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(7), qiReq2.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(7), qiReq3.Txn.Sequence) + + etReq := ba.Requests[4].GetEndTxn() + require.Equal(t, []roachpb.Span{{Key: keyC, EndKey: keyE}}, etReq.LockSpans) + expInFlight := []roachpb.SequencedWrite{ + {Key: keyA, Sequence: 7}, + {Key: keyB, Sequence: 7}, + {Key: keyD, Sequence: 7}, + } + require.Equal(t, expInFlight, etReq.InFlightWrites) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + br.Responses[0].GetQueryIntent().FoundIntent = true + br.Responses[0].GetQueryIntent().FoundUnpushedIntent = true + br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true + br.Responses[2].GetQueryIntent().FoundUnpushedIntent = true + br.Responses[3].GetQueryIntent().FoundIntent = true + br.Responses[3].GetQueryIntent().FoundUnpushedIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Len(t, br.Responses, 2) // QueryIntent response stripped + require.Equal(t, 0, tp.ifWrites.len()) // should be cleared out +} + // TestTxnPipelinerEnableDisableMixTxn tests that the txnPipeliner behaves // correctly if pipelining is enabled or disabled midway through a transaction. func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { @@ -1782,6 +1907,25 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { largeWrite := putBatch(largeAs, nil) mediumWrite := putBatch(largeAs[:5], nil) + lockingScanRequest := &kvpb.BatchRequest{} + lockingScanRequest.Header.MaxSpanRequestKeys = 1 + lockingScanRequest.Add(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("b"), + }, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, + }) + lockingScanResp := lockingScanRequest.CreateReply() + lockingScanResp.Responses[0].GetInner().(*kvpb.ScanResponse).ResumeSpan = &roachpb.Span{ + Key: largeAs, + EndKey: roachpb.Key("b"), + } + + // DeleteRange requests are accounted for a bit differently than other ranged + // requests -- locks acquired by them are tracked more precisely by going + // through the actual keys deleted in the response. delRange := &kvpb.BatchRequest{} delRange.Header.MaxSpanRequestKeys = 1 delRange.Add(&kvpb.DeleteRangeRequest{ @@ -1789,12 +1933,10 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { Key: roachpb.Key("a"), EndKey: roachpb.Key("b"), }, + ReturnKeys: true, }) delRangeResp := delRange.CreateReply() - delRangeResp.Responses[0].GetInner().(*kvpb.DeleteRangeResponse).ResumeSpan = &roachpb.Span{ - Key: largeAs, - EndKey: roachpb.Key("b"), - } + delRangeResp.Responses[0].GetInner().(*kvpb.DeleteRangeResponse).Keys = []roachpb.Key{largeAs} testCases := []struct { name string @@ -1836,7 +1978,26 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { maxSize: 10 + roachpb.SpanOverhead, }, { - name: "response goes over budget, next request rejected", + name: "scan response goes over budget, next request rejected", + // A request returns a response with a large resume span, which takes up + // the budget. Then the next request will be rejected. + reqs: []*kvpb.BatchRequest{lockingScanRequest, putBatch(roachpb.Key("a"), nil)}, + resp: []*kvpb.BatchResponse{lockingScanResp}, + expRejectIdx: 1, + maxSize: 10 + roachpb.SpanOverhead, + }, + { + name: "scan response goes over budget", + // Like the previous test, except here we don't have a followup request + // once we're above budget. The test runner will commit the txn, and this + // test checks that committing is allowed. + reqs: []*kvpb.BatchRequest{lockingScanRequest}, + resp: []*kvpb.BatchResponse{lockingScanResp}, + expRejectIdx: -1, + maxSize: 10 + roachpb.SpanOverhead, + }, + { + name: "del range response goes over budget, next request rejected", // A request returns a response with a large resume span, which takes up // the budget. Then the next request will be rejected. reqs: []*kvpb.BatchRequest{delRange, putBatch(roachpb.Key("a"), nil)}, @@ -1845,7 +2006,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { maxSize: 10 + roachpb.SpanOverhead, }, { - name: "response goes over budget", + name: "del range response goes over budget", // Like the previous test, except here we don't have a followup request // once we're above budget. The test runner will commit the txn, and this // test checks that committing is allowed. diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 35c8e2d81b38..daae1b60b120 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -75,15 +75,19 @@ const ( canSkipLocked // commands which can evaluate under the SkipLocked wait policy bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast requiresClosedTSOlderThanStorageSnapshot // commands which read a replica's closed timestamp that is older than the state of the storage engine + canPipeline // commands which can be pipelined + canParallelCommit // commands which can be part of a parallel commit batch ) // flagDependencies specifies flag dependencies, asserted by TestFlagCombinations. var flagDependencies = map[flag][]flag{ - isAdmin: {isAlone}, - isLocking: {isTxn}, - isIntentWrite: {isWrite, isLocking}, - appliesTSCache: {isWrite}, - skipsLeaseCheck: {isAlone}, + isAdmin: {isAlone}, + isLocking: {isTxn}, + isIntentWrite: {isWrite, isLocking}, + canPipeline: {isIntentWrite}, + canParallelCommit: {canPipeline}, + appliesTSCache: {isWrite}, + skipsLeaseCheck: {isAlone}, } // flagExclusions specifies flag incompatibilities, asserted by TestFlagCombinations. @@ -180,6 +184,17 @@ func BypassesReplicaCircuitBreaker(args Request) bool { return (args.flags() & bypassesReplicaCircuitBreaker) != 0 } +// CanPipeline returns true iff the command can be pipelined. +func CanPipeline(args Request) bool { + return (args.flags() & canPipeline) != 0 +} + +// CanParallelCommit returns true iff the command can be part of a batch that is +// committed in parallel. +func CanParallelCommit(args Request) bool { + return (args.flags() & canParallelCommit) != 0 +} + // Request is an interface for RPC requests. type Request interface { protoutil.Message @@ -1578,7 +1593,8 @@ func (gr *GetRequest) flags() flag { } func (*PutRequest) flags() flag { - return isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure + return isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure | + canPipeline | canParallelCommit } // ConditionalPut effectively reads without writing if it hits a @@ -1588,7 +1604,8 @@ func (*PutRequest) flags() flag { // transaction to be retried at end transaction. func (*ConditionalPutRequest) flags() flag { return isRead | isWrite | isTxn | isLocking | isIntentWrite | - appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure + appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure | canPipeline | + canParallelCommit } // InitPut, like ConditionalPut, effectively reads without writing if it hits a @@ -1598,7 +1615,8 @@ func (*ConditionalPutRequest) flags() flag { // to be retried at end transaction. func (*InitPutRequest) flags() flag { return isRead | isWrite | isTxn | isLocking | isIntentWrite | - appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure + appliesTSCache | updatesTSCache | updatesTSCacheOnErr | canBackpressure | + canPipeline | canParallelCommit } // Increment reads the existing value, but always leaves an intent so @@ -1607,7 +1625,8 @@ func (*InitPutRequest) flags() flag { // error immediately instead of continuing a serializable transaction // to be retried at end transaction. func (*IncrementRequest) flags() flag { - return isRead | isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure | + canPipeline | canParallelCommit } func (*DeleteRequest) flags() flag { @@ -1615,7 +1634,8 @@ func (*DeleteRequest) flags() flag { // an existing key was deleted at the read timestamp. isIntentWrite allows // omitting needsRefresh. For background, see: // https://github.com/cockroachdb/cockroach/pull/89375 - return isRead | isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure + return isRead | isWrite | isTxn | isLocking | isIntentWrite | appliesTSCache | canBackpressure | + canPipeline | canParallelCommit } func (drr *DeleteRangeRequest) flags() flag { @@ -1631,6 +1651,10 @@ func (drr *DeleteRangeRequest) flags() flag { // transaction by TxnCoordSender, which can occur if the command spans // multiple ranges. // + // As inline deletes cannot be part of a transaction, and don't go through the + // TxnCoordSender stack, there's no pipelining to speak of. As such, they + // don't set the canPipeline flag as well. + // // TODO(mrtracy): The behavior of DeleteRangeRequest with "inline" set has // likely diverged enough that it should be promoted into its own command. // However, it is complicated to plumb a new command through the system, @@ -1640,6 +1664,22 @@ func (drr *DeleteRangeRequest) flags() flag { if drr.Inline { return isRead | isWrite | isRange | isAlone } + + maybeCanPipeline := flag(0) + // DeleteRange requests operate over a range of keys. As such, we only + // know the actual keys that were deleted on the response path, not on the + // request path. This prevents them from being part of a batch that can be + // committed using parallel commits -- that's because for parallel commit + // recovery we need the entire in-flight write set to plop on the txn record, + // and we don't have that on the request path if the batch contains a + // DeleteRange request. + // + // We'll only know the actual keys that were deleted on the response path if + // they're returned to us. This is contingent on the ReturnKeys flag being set + // on the request. + if drr.ReturnKeys { + maybeCanPipeline = canPipeline + } // DeleteRange updates the timestamp cache as it doesn't leave intents or // tombstones for keys which don't yet exist or keys that already have // tombstones on them, but still wants to prevent anybody from writing under @@ -1647,7 +1687,7 @@ func (drr *DeleteRangeRequest) flags() flag { // that exist would not be lost (since the DeleteRange leaves intents on // those keys), but deletes of "empty space" would. return isRead | isWrite | isTxn | isLocking | isIntentWrite | isRange | - appliesTSCache | updatesTSCache | needsRefresh | canBackpressure + appliesTSCache | updatesTSCache | needsRefresh | canBackpressure | maybeCanPipeline } // Note that ClearRange commands cannot be part of a transaction as diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index a88989c3b75d..63705f695205 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -78,8 +78,8 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error) for _, ru := range otherReqs { req := ru.GetInner() switch { - case kvpb.IsIntentWrite(req) && !kvpb.IsRange(req): - // Concurrent point write. + case kvpb.CanParallelCommit(req): + // Concurrent point write being committed in parallel. writes++ case req.Method() == kvpb.QueryIntent: // Earlier pipelined point write that hasn't been proven yet. @@ -109,8 +109,8 @@ func maybeStripInFlightWrites(ba *kvpb.BatchRequest) (*kvpb.BatchRequest, error) req := ru.GetInner() seq := req.Header().Sequence switch { - case kvpb.IsIntentWrite(req) && !kvpb.IsRange(req): - // Concurrent point write. + case kvpb.CanParallelCommit(req): + // Concurrent point write being committed in parallel. case req.Method() == kvpb.QueryIntent: // Earlier pipelined point write that hasn't been proven yet. We // could remove from the in-flight writes set when we see these, diff --git a/pkg/server/intent_test.go b/pkg/server/intent_test.go index 83712575c7d1..1440d676688a 100644 --- a/pkg/server/intent_test.go +++ b/pkg/server/intent_test.go @@ -32,6 +32,11 @@ import ( ) // TODO(benesch): move this test to somewhere more specific than package server. +// TODO(arul): consider deleting this test in its entirety. It's quite old and +// testing too many things. It'll prove cumbersome when we precisely track +// locks acquired by scan requests and only perform point intent resolution for +// those keys in the future (instead of performing ranged intent resolution +// on the request's span). func TestIntentResolution(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -147,8 +152,9 @@ func TestIntentResolution(t *testing.T) { local := rnd.Intn(2) == 0 log.Infof(context.Background(), "%d: [%s,%s): local: %t", i, kr[0], kr[1], local) if local { - b.DelRange(kr[0], kr[1], false /* returnKeys */) - } else if _, err := txn.DelRange(ctx, kr[0], kr[1], false /* returnKeys */); err != nil { + b.ScanForUpdate(kr[0], kr[1], kvpb.BestEffort) + } else if _, err := txn.ScanForUpdate( + ctx, kr[0], kr[1], 0 /* maxRows */, kvpb.BestEffort); err != nil { return err } }