Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: use response keys of ranged writes for lock tracking #121086

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1119,6 +1120,17 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
t.Errorf("Invalid lock spans: %+v; expected %+v", et.LockSpans, expectedLockSpans)
}
br.Txn.Status = roachpb.COMMITTED
} else {
// Pre-EndTxn requests.
require.Len(t, ba.Requests, 1)
if sArgs, ok := ba.GetArg(kvpb.Scan); ok {
require.Equal(t, lock.Shared, sArgs.(*kvpb.ScanRequest).KeyLockingStrength)
br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: roachpb.Key("a")}}
}
if drArgs, ok := ba.GetArg(kvpb.DeleteRange); ok {
require.True(t, drArgs.(*kvpb.DeleteRangeRequest).ReturnKeys)
br.Responses[0].GetDeleteRange().Keys = []roachpb.Key{roachpb.Key("u"), roachpb.Key("w")}
}
}
return br, nil
}
Expand All @@ -1138,7 +1150,8 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
db := kv.NewDB(ambient, factory, clock, stopper)
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)

// Acquire locks on a-b, c, m, u-w before the final batch.
// Acquire locks on a, c, m, u, x before the final batch.
// NOTE: ScanForShare finds and locks "a" (see senderFn).
_, pErr := txn.ScanForShare(
ctx, roachpb.Key("a"), roachpb.Key("b"), 0, kvpb.GuaranteedDurability,
)
Expand All @@ -1149,35 +1162,33 @@ func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
if pErr != nil {
t.Fatal(pErr)
}
// NOTE: GetForUpdate does not find a key to lock.
_, pErr = txn.GetForUpdate(ctx, roachpb.Key("m"), kvpb.GuaranteedDurability)
if pErr != nil {
t.Fatal(pErr)
}
_, pErr = txn.DelRange(ctx, roachpb.Key("u"), roachpb.Key("w"), false /* returnKeys */)
// NOTE: DelRange finds and locks "u" and "w" (see senderFn).
_, pErr = txn.DelRange(ctx, roachpb.Key("u"), roachpb.Key("x"), false /* returnKeys */)
if pErr != nil {
t.Fatal(pErr)
}

// The final batch overwrites key c, reads key n, and overlaps part of the a-b and u-w ranges.
// The final batch overwrites key c, reads key n, and overlaps w with the v-z range.
b := txn.NewBatch()
b.Put(roachpb.Key("b"), []byte("value"))
b.Put(roachpb.Key("c"), []byte("value"))
b.Put(roachpb.Key("d"), []byte("value"))
b.DelRange(roachpb.Key("c"), roachpb.Key("e"), true /* returnKeys */)
b.Put(roachpb.Key("f"), []byte("value"))
b.GetForUpdate(roachpb.Key("n"), kvpb.GuaranteedDurability)
b.ReverseScanForShare(roachpb.Key("v"), roachpb.Key("z"), kvpb.GuaranteedDurability)

// The expected locks are a-b, c, m, n, and v-z.
//
// A note about the v-z span -- because the DeleteRange request did not
// actually delete any keys, we'll not track anything for it in the lock
// footprint for this transaction. The v-z range comes from the
// ReverseScanForShare request in the final batch.
// The expected locks are a, b, c-e, f, n, u, and v-z.
expectedLockSpans = []roachpb.Span{
{Key: roachpb.Key("a"), EndKey: roachpb.Key("b").Next()},
{Key: roachpb.Key("c"), EndKey: nil},
{Key: roachpb.Key("d"), EndKey: nil},
{Key: roachpb.Key("m"), EndKey: nil},
{Key: roachpb.Key("a"), EndKey: nil},
{Key: roachpb.Key("b"), EndKey: nil},
{Key: roachpb.Key("c"), EndKey: roachpb.Key("e")},
{Key: roachpb.Key("f"), EndKey: nil},
{Key: roachpb.Key("n"), EndKey: nil},
{Key: roachpb.Key("u"), EndKey: nil},
{Key: roachpb.Key("v"), EndKey: roachpb.Key("z")},
}

Expand Down
73 changes: 40 additions & 33 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ type txnPipeliner struct {
// In-flight writes are intent point writes that have not yet been proved
// to have succeeded. They will need to be proven before the transaction
// can commit.
// TODO(nvanbenschoten): once we start tracking locking read requests in
// this set, we should decide on whether to rename "in-flight writes" to
// something else. We could rename to "in-flight locks". Or we could keep
// "in-flight writes" but make it clear that these are lock writes of any
// strength, and not just intent writes.
ifWrites inFlightWriteSet
// The in-flight writes chain index is used to uniquely identify calls to
// chainToInFlightWrites, so that each call can limit itself to adding a
Expand Down Expand Up @@ -309,7 +314,11 @@ func (tp *txnPipeliner) SendLocked(
// go over budget despite the earlier pre-emptive check, then we stay over
// budget. Further requests will be rejected if they attempt to take more
// locks.
tp.updateLockTracking(ctx, ba, br, pErr, maxBytes, !rejectOverBudget /* condenseLocksIfOverBudget */)
if err := tp.updateLockTracking(
ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
); err != nil {
return nil, kvpb.NewError(err)
}
if pErr != nil {
return nil, tp.adjustError(ctx, ba, pErr)
}
Expand Down Expand Up @@ -341,9 +350,11 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
}

var spans []roachpb.Span
ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
if err := ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
spans = append(spans, sp)
})
}); err != nil {
return errors.Wrap(err, "iterating lock spans")
}

// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
Expand Down Expand Up @@ -633,8 +644,10 @@ func (tp *txnPipeliner) updateLockTracking(
pErr *kvpb.Error,
maxBytes int64,
condenseLocksIfOverBudget bool,
) {
tp.updateLockTrackingInner(ctx, ba, br, pErr)
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
return err
}

// Deal with compacting the lock spans.

Expand All @@ -643,7 +656,7 @@ func (tp *txnPipeliner) updateLockTracking(
locksBudget := maxBytes - tp.ifWrites.byteSize()
// If we're below budget, there's nothing more to do.
if tp.lockFootprint.bytesSize() <= locksBudget {
return
return nil
}

// We're over budget. If condenseLocksIfOverBudget is set, we condense the
Expand All @@ -652,7 +665,7 @@ func (tp *txnPipeliner) updateLockTracking(
// txn if we fail (see the estimateSize() call).

if !condenseLocksIfOverBudget {
return
return nil
}

// After adding new writes to the lock footprint, check whether we need to
Expand All @@ -669,11 +682,12 @@ func (tp *txnPipeliner) updateLockTracking(
tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
}
return nil
}

func (tp *txnPipeliner) updateLockTrackingInner(
ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, pErr *kvpb.Error,
) {
) error {
// If the request failed, add all lock acquisitions attempts directly to the
// lock footprint. This reduces the likelihood of dangling locks blocking
// concurrent requests for extended periods of time. See #3346.
Expand All @@ -695,8 +709,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
copy(baStripped.Requests, ba.Requests[:pErr.Index.Index])
copy(baStripped.Requests[pErr.Index.Index:], ba.Requests[pErr.Index.Index+1:])
}
baStripped.LockSpanIterate(nil, tp.trackLocks)
return
return baStripped.LockSpanIterate(nil, tp.trackLocks)
}

// Similarly, if the transaction is now finalized, we don't need to
Expand All @@ -707,7 +720,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// If the transaction is now ABORTED, add all locks acquired by the
// batch directly to the lock footprint. We don't know which of
// these succeeded.
ba.LockSpanIterate(nil, tp.trackLocks)
return ba.LockSpanIterate(nil, tp.trackLocks)
case roachpb.COMMITTED:
// If the transaction is now COMMITTED, it must not have any more
// in-flight writes, so clear them. Technically we should move all
Expand All @@ -717,10 +730,10 @@ func (tp *txnPipeliner) updateLockTrackingInner(
/* reuse - we're not going to use this Btree again, so there's no point in
moving the nodes to a free list */
false)
return nil
default:
panic("unexpected")
}
return
}

for i, ru := range ba.Requests {
Expand Down Expand Up @@ -751,33 +764,27 @@ func (tp *txnPipeliner) updateLockTrackingInner(
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
header := req.Header()
if kvpb.IsRange(req) {
switch req.Method() {
case kvpb.DeleteRange:
for _, key := range resp.(*kvpb.DeleteRangeResponse).Keys {
tp.ifWrites.insert(key, header.Sequence)
}
default:
log.Fatalf(ctx, "unexpected ranged request with AsyncConsensus: %s", req)
seq := req.Header().Sequence
trackLocks := func(span roachpb.Span, _ lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
if span.EndKey != nil {
log.Fatalf(ctx, "unexpected multi-key intent pipelined")
}
tp.ifWrites.insert(span.Key, seq)
} else {
tp.ifWrites.insert(header.Key, header.Sequence)
}
} else {
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint. Locking read
// requests will always hit this path because they will never
// use async consensus.
if sp, ok := kvpb.ActualSpan(req, resp); ok {
tp.lockFootprint.insert(sp)
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint.
tp.lockFootprint.insert(span)
}
}
if err := kvpb.LockSpanIterate(req, resp, trackLocks); err != nil {
return errors.Wrap(err, "iterating lock spans")
}
}
}
return nil
}

func (tp *txnPipeliner) trackLocks(s roachpb.Span, _ lock.Durability) {
Expand Down
Loading
Loading