Skip to content

Commit

Permalink
kv: allow DeleteRangeRequests to be pipelined
Browse files Browse the repository at this point in the history
Previously, ranged requests could not be pipelined. However, there is no
good reason to not allow them to be pipeliend -- we just have to take
extra care to correctly update in-flight writes tracking on the response
path. We do so now.

As part of this patch, we introduce two new flags -- canPipeline and
canParallelCommit. We use these flags to determine whether batches can
be pipelined or committed using parallel commits. This is in contrast to
before, where we derived this information from other flags
(isIntentWrite, !isRange). This wasn't strictly necessary for this
change, but helps clean up the concepts.

As a consequence of this change, we now have a distinction between
requests that can be pipelined and requests that can be part of a batch
that can be committed in parallel. Notably, this applies to
DeleteRangeRequests -- they can be pipeliend, but not be committed in
parallel. That's because we need to have the entire write set upfront
when performing a parallel commit, lest we need to perform recovery --
we don't have this for DeleteRange requests.

In the future, we'll extend the concept of canPipeline
(and !canParallelCommit) to other locking ranged requests as well. In
particular, (replicated) locking {,Reverse}ScanRequests who want to
pipeline their lock acquisitions.

Closes cockroachdb#64723
Informs cockroachdb#117978

Release note: None
  • Loading branch information
arulajmani committed Mar 25, 2024
1 parent 4c299d1 commit ce93193
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 90 deletions.
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
{
name: "forwarded timestamp with delete range",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "put") // ensure DeleteRange is not a no-op
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // read key to set ts cache
return err
Expand Down Expand Up @@ -2552,6 +2555,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
name: "forwarded timestamp with too many refreshes in batch commit " +
"with refresh",
refreshSpansCondenseFilter: disableCondensingRefreshSpans,
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "put") // ensure DeleteRange is not a no-op
},
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
Expand Down
181 changes: 156 additions & 25 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,14 +1166,19 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
b.GetForUpdate(roachpb.Key("n"), kvpb.GuaranteedDurability)
b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z"), kvpb.GuaranteedDurability)

// The expected locks are a-b, c, m, n, and u-z.
// The expected locks are a-b, c, m, n, and v-z.
//
// A note about the v-z span -- because the DeleteRange request did not
// actually delete any keys, we'll not track anything for it in the lock
// footprint for this transaction. The v-z range comes from the
// ReverseScanForShare request in the final batch.
expectedLockSpans = []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()},
{Key: roachpb.Key("c"), EndKey: nil},
{Key: roachpb.Key("d"), EndKey: nil},
{Key: roachpb.Key("m"), EndKey: nil},
{Key: roachpb.Key("n"), EndKey: nil},
{Key: roachpb.Key("u"), EndKey: roachpb.Key("z")},
{Key: roachpb.Key("v"), EndKey: roachpb.Key("z")},
}

pErr = txn.CommitInBatch(ctx, b)
Expand Down Expand Up @@ -1986,6 +1991,15 @@ func TestCommitMutatingTransaction(t *testing.T) {
if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) {
t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key)
}

if _, ok := ba.GetArg(kvpb.DeleteRange); ok {
// Simulate deleting a single key for DeleteRange requests. Unlike other
// point writes, pipelined DeleteRange writes are tracked by looking at
// the batch response.
resp := br.Responses[0].GetInner()
resp.(*kvpb.DeleteRangeResponse).Keys = []roachpb.Key{roachpb.Key("a")}
}

if et, ok := ba.GetArg(kvpb.EndTxn); ok {
if !et.(*kvpb.EndTxnRequest).Commit {
t.Errorf("expected commit to be true")
Expand All @@ -2009,61 +2023,178 @@ func TestCommitMutatingTransaction(t *testing.T) {
testArgs := []struct {
f func(ctx context.Context, txn *kv.Txn) error
expMethod kvpb.Method
// pointWrite is set if the method is a "point write", which means that it
// will be pipelined and we should expect a QueryIntent request at commit
// time.
pointWrite bool
// All retryable functions below involve writing to exactly one key. The
// write will be pipelined if it's not in the same batch as the
// EndTxnRequest, in which case we expect a single QueryIntent request to be
// added when trying to commit the transaction.
expQueryIntent bool
}{
{
f: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "b") },
expMethod: kvpb.Put,
pointWrite: true,
f: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "b") },
expMethod: kvpb.Put,
expQueryIntent: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Put("a", "b")
return txn.CommitInBatch(ctx, b)
},
expMethod: kvpb.Put,
},
{
f: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "b", nil) },
expMethod: kvpb.ConditionalPut,
pointWrite: true,
f: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "b", nil) },
expMethod: kvpb.ConditionalPut,
expQueryIntent: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.CPut("a", "b", nil)
return txn.CommitInBatch(ctx, b)
},
expMethod: kvpb.ConditionalPut,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Inc(ctx, "a", 1)
return err
},
expMethod: kvpb.Increment,
pointWrite: true,
expMethod: kvpb.Increment,
expQueryIntent: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Inc("a", 1)
return txn.CommitInBatch(ctx, b)
},
expMethod: kvpb.Increment,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Del(ctx, "a")
return err
},
expMethod: kvpb.Delete,
pointWrite: true,
expMethod: kvpb.Delete,
expQueryIntent: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Del("a")
return txn.CommitInBatch(ctx, b)
},
expMethod: kvpb.Delete,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */)
return err
},
expMethod: kvpb.DeleteRange,
pointWrite: false,
expMethod: kvpb.DeleteRange,
expQueryIntent: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.DelRange("a", "b", false /* returnKeys */)
return txn.CommitInBatch(ctx, b)
},
expMethod: kvpb.DeleteRange,
expQueryIntent: false,
},
}
for i, test := range testArgs {
for _, test := range testArgs {
t.Run(test.expMethod.String(), func(t *testing.T) {
calls = nil
db := kv.NewDB(log.MakeTestingAmbientCtxWithNewTracer(), factory, clock, stopper)
if err := db.Txn(ctx, test.f); err != nil {
t.Fatalf("%d: unexpected error on commit: %s", i, err)
}
require.NoError(t, db.Txn(ctx, test.f))
expectedCalls := []kvpb.Method{test.expMethod}
if test.pointWrite {
if test.expQueryIntent {
expectedCalls = append(expectedCalls, kvpb.QueryIntent)
}
expectedCalls = append(expectedCalls, kvpb.EndTxn)
if !reflect.DeepEqual(expectedCalls, calls) {
t.Fatalf("%d: expected %s, got %s", i, expectedCalls, calls)
require.Equal(t, expectedCalls, calls)
})
}
}

// TestCommitNoopDeleteRangeTransaction ensures that committing a no-op
// DeleteRange transaction works correctly. In particular, the EndTxn request is
// elided, if possible.
func TestCommitNoopDeleteRangeTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
clock := hlc.NewClockForTesting(nil)
ambient := log.MakeTestingAmbientCtxWithNewTracer()
sender := &mockSender{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var calls []kvpb.Method
sender.match(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()

calls = append(calls, ba.Methods()...)
if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) {
t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key)
}
if et, ok := ba.GetArg(kvpb.EndTxn); ok {
if !et.(*kvpb.EndTxnRequest).Commit {
t.Errorf("expected commit to be true")
}
br.Txn.Status = roachpb.COMMITTED
}

// Don't return any keys in the DeleteRange response, which simulates what a
// no-op DeleteRange looks like from the client's perspective.
return br, nil
})

factory := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Clock: clock,
Stopper: stopper,
Settings: cluster.MakeTestingClusterSettings(),
},
sender,
)

testCases := []struct {
f func(ctx context.Context, txn *kv.Txn) error
elideEndTxn bool
}{
{
f: func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.DelRange(ctx, "a", "d", true /* returnKeys */)
return err
},
elideEndTxn: true,
},
{
f: func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.DelRange("a", "b", true /* returnKeys */)
return txn.CommitInBatch(ctx, b)
},
elideEndTxn: false,
},
}

for i, test := range testCases {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
calls = nil
db := kv.NewDB(log.MakeTestingAmbientCtxWithNewTracer(), factory, clock, stopper)
require.NoError(t, db.Txn(ctx, test.f))
expectedCalls := []kvpb.Method{kvpb.DeleteRange}
if !test.elideEndTxn {
expectedCalls = append(expectedCalls, kvpb.EndTxn)
}
require.Equal(t, expectedCalls, calls)
})
}
}
Expand Down
25 changes: 7 additions & 18 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (tc *txnCommitter) SendLocked(
if err := tc.validateEndTxnBatch(ba); err != nil {
return nil, kvpb.NewError(err)
}

// Determine whether we can elide the EndTxn entirely. We can do so if the
// transaction is read-only, which we determine based on whether the EndTxn
// request contains any writes.
Expand Down Expand Up @@ -367,31 +366,21 @@ func (tc *txnCommitter) canCommitInParallel(ba *kvpb.BatchRequest, et *kvpb.EndT
for _, ru := range ba.Requests[:len(ba.Requests)-1] {
req := ru.GetInner()
switch {
case kvpb.IsIntentWrite(req):
if kvpb.IsRange(req) {
// Similar to how we can't pipeline ranged writes, we also can't
// commit in parallel with them. The reason for this is that the
// status resolution process for STAGING transactions wouldn't
// know where to look for the corresponding intents.
return false
}
// All other point writes are included in the EndTxn request's
// InFlightWrites set and are visible to the status resolution
// process for STAGING transactions. Populating InFlightWrites
// has already been done by the txnPipeliner.
case kvpb.CanParallelCommit(req):
// The request can be part of a batch that is committed in parallel.

case req.Method() == kvpb.QueryIntent:
// QueryIntent requests are compatable with parallel commits. The
// QueryIntent requests are compatible with parallel commits. The
// intents being queried are also attached to the EndTxn request's
// InFlightWrites set and are visible to the status resolution
// process for STAGING transactions. Populating InFlightWrites has
// already been done by the txnPipeliner.

default:
// All other request types, notably Get and Scan requests, are
// incompatible with parallel commits because their outcome is
// not taken into consideration by the status resolution process
// for STAGING transactions.
// All other request types, notably Get, Scan and DeleteRange requests,
// are incompatible with parallel commits because their outcome is not
// taken into consideration by the status resolution process for STAGING
// transactions.
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTxnCommitterElideEndTxn(t *testing.T) {
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
Expand Down
48 changes: 33 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,18 +459,31 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
for _, ru := range ba.Requests {
req := ru.GetInner()

// Determine whether the current request prevents us from performing async
// consensus on the batch.
if !kvpb.IsIntentWrite(req) || kvpb.IsRange(req) {
// Only allow batches consisting of solely transactional point
// writes to perform consensus asynchronously.
// TODO(nvanbenschoten): We could allow batches with reads and point
// writes to perform async consensus, but this would be a bit
// tricky. Any read would need to chain on to any write that came
// before it in the batch and overlaps. For now, it doesn't seem
// worth it.
if req.Method() == kvpb.DeleteRange {
// Special handling for DeleteRangeRequests.
deleteRangeReq := req.(*kvpb.DeleteRangeRequest)
// We'll need the list of keys deleted to verify whether replication
// succeeded or not. Override ReturnKeys.
//
// NB: This means we'll return keys to the client even if it explicitly
// set this to false. If this proves to be a problem in practice, we can
// always add some tracking here and strip the response. Alternatively, we
// can disable DeleteRange pipelining entirely for requests that set this
// field to false.
//
// TODO(arul): Get rid of this flag entirely and always treat it as true.
// Now that we're overriding ReturnKeys here, the number of cases where
// this will be false are very few -- it's only when DeleteRange is part
// of the same batch as an EndTxn request.
deleteRangeReq.ReturnKeys = true
}

if !kvpb.CanPipeline(req) {
// The current request cannot be pipelined, so it prevents us from
// performing async consensus on the batch.
return false
}

// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
Expand Down Expand Up @@ -730,12 +743,17 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
header := req.Header()
tp.ifWrites.insert(header.Key, header.Sequence)
// The request is not expected to be a ranged one, as we're only
// tracking one key in the ifWrites. Ranged requests do not admit
// ba.AsyncConsensus.
if kvpb.IsRange(req) {
log.Fatalf(ctx, "unexpected range request with AsyncConsensus: %s", req)
switch req.Method() {
case kvpb.DeleteRange:
for _, key := range resp.(*kvpb.DeleteRangeResponse).Keys {
tp.ifWrites.insert(key, header.Sequence)
}
default:
log.Fatalf(ctx, "unexpected ranged request with AsyncConsensus: %s", req)
}
} else {
tp.ifWrites.insert(header.Key, header.Sequence)
}
} else {
// If the lock acquisitions weren't performed asynchronously
Expand Down
Loading

0 comments on commit ce93193

Please sign in to comment.