Skip to content

Commit

Permalink
kv: allow DeleteRangeRequests to be pipelined
Browse files Browse the repository at this point in the history
Previously, ranged requests could not be pipelined. However, there is no
good reason to not allow them to be pipeliend -- we just have to take
extra care to correctly update in-flight writes tracking on the response
path. We do so now.

As part of this patch, we introduce two new flags -- canPipeline and
canParallelCommit. We use these flags to determine whether batches can
be pipelined or committed using parallel commits. This is in contrast to
before, where we derived this information from other flags
(isIntentWrite, !isRange). This wasn't strictly necessary for this
change, but helps clean up the concepts.

As a consequence of this change, we now have a distinction between
requests that can be pipelined and requests that can be part of a batch
that can be committed in parallel. Notably, this applies to
DeleteRangeRequests -- they can be pipeliend, but not be committed in
parallel. That's because we need to have the entire write set upfront
when performing a parallel commit, lest we need to perform recovery --
we don't have this for DeleteRange requests.

In the future, we'll extend the concept of canPipeline
(and !canParallelCommit) to other locking ranged requests as well. In
particular, (replicated) locking {,Reverse}ScanRequests who want to
pipeline their lock acquisitions.

Closes cockroachdb#64723
Informs cockroachdb#117978

Release note: None
  • Loading branch information
arulajmani committed Mar 5, 2024
1 parent 534be29 commit 3d8ea25
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (tc *TxnCoordSender) Send(
return nil, pErr
}

if ba.IsSingleEndTxnRequest() && !tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
if ba.IsSingleEndTxnRequest() && !tc.interceptorAlloc.disableElideEndTxn {
return nil, tc.finalizeNonLockingTxnLocked(ctx, ba)
}

Expand Down
35 changes: 25 additions & 10 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,21 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting(
// In all cases, the interceptor abstracts away the details of this from all
// interceptors above it in the coordinator interceptor stack.
type txnCommitter struct {
st *cluster.Settings
stopper *stop.Stopper
wrapped lockedSender
metrics *TxnMetrics
mu sync.Locker
disable1PC bool
st *cluster.Settings
stopper *stop.Stopper
wrapped lockedSender
metrics *TxnMetrics
mu sync.Locker
disable1PC bool
disableElideEndTxn bool
}

// SendLocked implements the lockedSender interface.
func (tc *txnCommitter) SendLocked(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
tc.maybeDisable1PC(ba)
tc.maybeDisableElideEndTxn(ba)
// If the batch does not include an EndTxn request, pass it through.
rArgs, hasET := ba.GetArg(kvpb.EndTxn)
if !hasET {
Expand All @@ -154,10 +156,8 @@ func (tc *txnCommitter) SendLocked(
return nil, kvpb.NewError(err)
}

// Determine whether we can elide the EndTxn entirely. We can do so if the
// transaction is read-only, which we determine based on whether the EndTxn
// request contains any writes.
if len(et.LockSpans) == 0 && len(et.InFlightWrites) == 0 {
// Elide the EndTxn if we're able to do so.
if !tc.disableElideEndTxn {
return tc.sendLockedWithElidedEndTxn(ctx, ba, et)
}

Expand Down Expand Up @@ -602,6 +602,21 @@ func (tc *txnCommitter) maybeDisable1PC(ba *kvpb.BatchRequest) {
}
}

// maybeDisableElideEndTxn determines whether the supplied batch prevents us
// from eliding the EndTxn request when the time comes. We can only elide EndTxn
// requests if the transaction is read-only and does not acquire locks.
func (tc *txnCommitter) maybeDisableElideEndTxn(ba *kvpb.BatchRequest) {
if tc.disableElideEndTxn {
return // already disabled; nothing to do
}
// Locks (including intent writes) acquired by a transaction need to be
// resolved after it finalizes. As a result, locking requests prevent us from
// eliding EndTxn requests (when the time comes to issue one).
if ba.IsLocking() {
tc.disableElideEndTxn = true
}
}

// setWrapped implements the txnInterceptor interface.
func (tc *txnCommitter) setWrapped(wrapped lockedSender) { tc.wrapped = wrapped }

Expand Down
46 changes: 31 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,18 +459,12 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
for _, ru := range ba.Requests {
req := ru.GetInner()

// Determine whether the current request prevents us from performing async
// consensus on the batch.
if !kvpb.IsIntentWrite(req) || kvpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
// The current request cannot be pipelined, so it prevents us from
// performing async consensus on the batch.
if !kvpb.CanPipeline(req) {
return false
}

// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
Expand All @@ -487,6 +481,23 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
log.VEventf(ctx, 2, "cannot perform async consensus because memory budget exceeded")
return false
}

switch req.Method() {
case kvpb.DeleteRange:
// Special handling for DeleteRangeRequests.
deleteRangeReq := req.(*kvpb.DeleteRangeRequest)
// We'll need the list of keys deleted to verify whether replicated
// succeeded or not. Override ReturnKeys.
//
// NB: This means we'll return keys to the client even if it explicitly
// set this to false. If this proves to be a problem in practice, we can
// always add some tracking here and strip the response. Alternatively, we
// can disable DeleteRange pipelining entirely for requests that set this
// field to false.
deleteRangeReq.ReturnKeys = true
default:
// Request can be pipelined; don't need to do anything special.
}
}
return true
}
Expand Down Expand Up @@ -730,12 +741,17 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
header := req.Header()
tp.ifWrites.insert(header.Key, header.Sequence)
// The request is not expected to be a ranged one, as we're only
// tracking one key in the ifWrites. Ranged requests do not admit
// ba.AsyncConsensus.
if kvpb.IsRange(req) {
log.Fatalf(ctx, "unexpected range request with AsyncConsensus: %s", req)
switch req.Method() {
case kvpb.DeleteRange:
for _, key := range resp.(*kvpb.DeleteRangeResponse).Keys {
tp.ifWrites.insert(key, header.Sequence)
}
default:
log.Fatalf(ctx, "unexpected ranged request with AsyncConsensus: %s", req)
}
} else {
tp.ifWrites.insert(header.Key, header.Sequence)
}
} else {
// If the lock acquisitions weren't performed asynchronously
Expand Down
109 changes: 100 additions & 9 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,10 @@ func TestTxnPipelinerReads(t *testing.T) {
require.Equal(t, 0, tp.ifWrites.len())
}

// TestTxnPipelinerRangedWrites tests that txnPipeliner will never perform
// ranged write operations using async consensus. It also tests that ranged
// writes will correctly chain on to existing in-flight writes.
// TestTxnPipelinerRangedWrites tests that the txnPipeliner can perform some
// ranged write operations using async consensus. In particular, ranged requests
// which have the canPipeline flag set. It also verifies that ranged writes will
// correctly chain on to existing in-flight writes.
func TestTxnPipelinerRangedWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -534,14 +535,15 @@ func TestTxnPipelinerRangedWrites(t *testing.T) {
txn := makeTxnProto()
keyA, keyD := roachpb.Key("a"), roachpb.Key("d")

// First, test DeleteRangeRequests which can be pipelined.
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyD}})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
require.False(t, ba.AsyncConsensus)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[1].GetInner())

Expand All @@ -553,8 +555,12 @@ func TestTxnPipelinerRangedWrites(t *testing.T) {
br, pErr := tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
// The PutRequest was not run asynchronously, so it is not outstanding.
require.Equal(t, 0, tp.ifWrites.len())
// The PutRequest was run asynchronously, so it has outstanding writes.
require.Equal(t, 1, tp.ifWrites.len())

// Clear outstanding write added by the put request from the set of inflight
// writes before inserting new writes below.
tp.ifWrites.clear(true /* reuse */)

// Add five keys into the in-flight writes set, one of which overlaps with
// the Put request and two others which also overlap with the DeleteRange
Expand All @@ -570,7 +576,7 @@ func TestTxnPipelinerRangedWrites(t *testing.T) {

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 5)
require.False(t, ba.AsyncConsensus)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
Expand Down Expand Up @@ -604,7 +610,31 @@ func TestTxnPipelinerRangedWrites(t *testing.T) {
br, pErr = tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 2, tp.ifWrites.len())
// The put will be added to ifWrites, and the 2 from before that weren't
// covered by QueryIntent requests.
require.Equal(t, 3, tp.ifWrites.len())

// Now, test RefreshRangeRequests, which cannot be pipelined.
ba = &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.RefreshRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyD}})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.RefreshRangeRequest{}, ba.Requests[1].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 0, tp.ifWrites.len())
}

// TestTxnPipelinerNonTransactionalRequests tests that non-transaction requests
Expand Down Expand Up @@ -913,7 +943,7 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
t.Run(fmt.Sprintf("errIdx=%d", errIdx), func(t *testing.T) {
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 7)
require.False(t, ba.AsyncConsensus)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
Expand All @@ -939,6 +969,67 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
}
}

// TestTxnPipelinerDeleteRangeRequests ensures the txnPipelineer correctly
// decides whether to pipeline DeleteRangeRequests. In particular, it ensures
// DeleteRangeRequests can only be pipelined iff the batch doesn't contain an
// EndTxn request.
func TestTxnPipelinerDeleteRangeRequests(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)

txn := makeTxnProto()
keyA, keyB, keyD, keyE := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("d"), roachpb.Key("e")

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyD}})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
resp := br.Responses[0].GetInner()
resp.(*kvpb.DeleteRangeResponse).Keys = []roachpb.Key{keyB}
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 1, tp.ifWrites.len())

// Now, create a batch which has (another) DeleteRangRequest and an
// EndTxnRequest as well.
ba = &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyD, EndKey: keyE}})
ba.Add(&kvpb.EndTxnRequest{Commit: true})
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.Equal(t, ba.Requests[1].GetInner().(*kvpb.QueryIntentRequest).Key, keyB)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[2].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
resp := br.Responses[1].GetInner()
resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 0, tp.ifWrites.len()) // should be cleared out
}

// TestTxnPipelinerEnableDisableMixTxn tests that the txnPipeliner behaves
// correctly if pipelining is enabled or disabled midway through a transaction.
func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
Expand Down
Loading

0 comments on commit 3d8ea25

Please sign in to comment.