Skip to content

Commit

Permalink
kv: use response keys of ranged writes for lock tracking
Browse files Browse the repository at this point in the history
Informs #117978.

This commit updates the `txnPipeliner` to use the response keys from
`Get`, `Scan`, `ReverseScan`, and `DeleteRange` requests to track
pipelined and non-pipelined lock acquisitions / intent writes, instead
of assuming that the requests could have left intents anywhere in their
request span. This more precise tracking avoid broad ranged intent
resolution when more narrow intent resolution is possible. It will also
be used by #117978 to track in-flight replicated lock acquisition.

This is a WIP. Some tests need to be updated.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 26, 2024
1 parent 94fe2d4 commit 1136532
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 48 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/localtestcluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1103,6 +1104,7 @@ func TestTxnMultipleCoord(t *testing.T) {
func TestTxnCoordSenderNoDuplicateLockSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.IgnoreLint(t, "WIP")
ctx := context.Background()
stopper := stop.NewStopper()
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 123)))
Expand Down
68 changes: 35 additions & 33 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ func (tp *txnPipeliner) SendLocked(
// go over budget despite the earlier pre-emptive check, then we stay over
// budget. Further requests will be rejected if they attempt to take more
// locks.
tp.updateLockTracking(ctx, ba, br, pErr, maxBytes, !rejectOverBudget /* condenseLocksIfOverBudget */)
if err := tp.updateLockTracking(
ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
); err != nil {
return nil, kvpb.NewError(err)
}
if pErr != nil {
return nil, tp.adjustError(ctx, ba, pErr)
}
Expand Down Expand Up @@ -336,9 +340,11 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
}

var spans []roachpb.Span
ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
if err := ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
spans = append(spans, sp)
})
}); err != nil {
return errors.Wrap(err, "iterating lock spans")
}

// Compute how many bytes we can allocate for locks. We account for the
// inflight-writes conservatively, since these might turn into lock spans
Expand Down Expand Up @@ -621,8 +627,10 @@ func (tp *txnPipeliner) updateLockTracking(
pErr *kvpb.Error,
maxBytes int64,
condenseLocksIfOverBudget bool,
) {
tp.updateLockTrackingInner(ctx, ba, br, pErr)
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
return err
}

// Deal with compacting the lock spans.

Expand All @@ -631,7 +639,7 @@ func (tp *txnPipeliner) updateLockTracking(
locksBudget := maxBytes - tp.ifWrites.byteSize()
// If we're below budget, there's nothing more to do.
if tp.lockFootprint.bytesSize() <= locksBudget {
return
return nil
}

// We're over budget. If condenseLocksIfOverBudget is set, we condense the
Expand All @@ -640,7 +648,7 @@ func (tp *txnPipeliner) updateLockTracking(
// txn if we fail (see the estimateSize() call).

if !condenseLocksIfOverBudget {
return
return nil
}

// After adding new writes to the lock footprint, check whether we need to
Expand All @@ -657,11 +665,12 @@ func (tp *txnPipeliner) updateLockTracking(
tp.txnMetrics.TxnsWithCondensedIntents.Inc(1)
tp.txnMetrics.TxnsWithCondensedIntentsGauge.Inc(1)
}
return nil
}

func (tp *txnPipeliner) updateLockTrackingInner(
ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, pErr *kvpb.Error,
) {
) error {
// If the request failed, add all lock acquisitions attempts directly to the
// lock footprint. This reduces the likelihood of dangling locks blocking
// concurrent requests for extended periods of time. See #3346.
Expand All @@ -683,8 +692,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
copy(baStripped.Requests, ba.Requests[:pErr.Index.Index])
copy(baStripped.Requests[pErr.Index.Index:], ba.Requests[pErr.Index.Index+1:])
}
baStripped.LockSpanIterate(nil, tp.trackLocks)
return
return baStripped.LockSpanIterate(nil, tp.trackLocks)
}

// Similarly, if the transaction is now finalized, we don't need to
Expand All @@ -695,7 +703,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// If the transaction is now ABORTED, add all locks acquired by the
// batch directly to the lock footprint. We don't know which of
// these succeeded.
ba.LockSpanIterate(nil, tp.trackLocks)
return ba.LockSpanIterate(nil, tp.trackLocks)
case roachpb.COMMITTED:
// If the transaction is now COMMITTED, it must not have any more
// in-flight writes, so clear them. Technically we should move all
Expand All @@ -705,10 +713,10 @@ func (tp *txnPipeliner) updateLockTrackingInner(
/* reuse - we're not going to use this Btree again, so there's no point in
moving the nodes to a free list */
false)
return nil
default:
panic("unexpected")
}
return
}

for i, ru := range ba.Requests {
Expand Down Expand Up @@ -739,33 +747,27 @@ func (tp *txnPipeliner) updateLockTrackingInner(
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
header := req.Header()
if kvpb.IsRange(req) {
switch req.Method() {
case kvpb.DeleteRange:
for _, key := range resp.(*kvpb.DeleteRangeResponse).Keys {
tp.ifWrites.insert(key, header.Sequence)
}
default:
log.Fatalf(ctx, "unexpected ranged request with AsyncConsensus: %s", req)
seq := req.Header().Sequence
trackLocks := func(span roachpb.Span, _ lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
if span.EndKey != nil {
log.Fatalf(ctx, "unexpected multi-key intent pipelined")
}
tp.ifWrites.insert(span.Key, seq)
} else {
tp.ifWrites.insert(header.Key, header.Sequence)
}
} else {
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint. Locking read
// requests will always hit this path because they will never
// use async consensus.
if sp, ok := kvpb.ActualSpan(req, resp); ok {
tp.lockFootprint.insert(sp)
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint.
tp.lockFootprint.insert(span)
}
}
if err := kvpb.LockSpanIterate(req, resp, trackLocks); err != nil {
return errors.Wrap(err, "iterating lock spans")
}
}
}
return nil
}

func (tp *txnPipeliner) trackLocks(s roachpb.Span, _ lock.Durability) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1465,6 +1466,7 @@ func TestTxnPipelinerMaxBatchSize(t *testing.T) {
func TestTxnPipelinerRecordsLocksOnFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.IgnoreLint(t, "WIP")
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)

Expand Down Expand Up @@ -1898,6 +1900,7 @@ func (s descriptorDBRangeIterator) Error() error {
func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.IgnoreLint(t, "WIP")
ctx := context.Background()

largeAs := make([]byte, 11)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_test(
"//pkg/roachpb",
"//pkg/storage/enginepb",
"//pkg/testutils/echotest",
"//pkg/testutils/skip",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/protoutil",
Expand Down
69 changes: 55 additions & 14 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,25 +474,47 @@ func (br *BatchResponse) String() string {
return strings.Join(str, ", ")
}

// LockSpanIterate calls the passed method with the key ranges of the
// transactional locks contained in the batch. Usually the key spans
// contained in the requests are used, but when a response contains a
// ResumeSpan the ResumeSpan is subtracted from the request span to
// provide a more minimal span of keys affected by the request.
func (ba *BatchRequest) LockSpanIterate(br *BatchResponse, fn func(roachpb.Span, lock.Durability)) {
// LockSpanIterate calls LockSpanIterate for each request in the batch.
func (ba *BatchRequest) LockSpanIterate(
br *BatchResponse, fn func(roachpb.Span, lock.Durability),
) error {
for i, arg := range ba.Requests {
req := arg.GetInner()
if !IsLocking(req) {
continue
}
var resp Response
if br != nil {
resp = br.Responses[i].GetInner()
}
if span, ok := ActualSpan(req, resp); ok {
fn(span, LockingDurability(req))
if err := LockSpanIterate(req, resp, fn); err != nil {
return err
}
}
return nil
}

// LockSpanIterate calls the passed function with the keys or key spans of the
// transactional locks acquired by the request. Usually the full key span that
// is addressed by the request is used. However, a more minimal span of keys can
// be provided in the following cases:
// - the request operates over a range of keys but the response includes the
// specific set of keys that were locked. In these cases, the provided
// function is called with each individual locked key.
// - the request operates over a range of keys but the response contains a
// ResumeSpan signifying the key spans not reached by the request. In these
// cases, the ResumeSpan is subtracted from the request span.
func LockSpanIterate(req Request, resp Response, fn func(roachpb.Span, lock.Durability)) error {
if !IsLocking(req) {
return nil
}
dur := LockingDurability(req)
if canIterateResponseKeys(req, resp) {
return ResponseKeyIterate(req, resp, func(key roachpb.Key) {
fn(roachpb.Span{Key: key}, dur)
})
}
if span, ok := actualSpan(req, resp); ok {
fn(span, dur)
}
return nil
}

// RefreshSpanIterate calls the passed function with the key spans of
Expand Down Expand Up @@ -536,18 +558,18 @@ func (ba *BatchRequest) RefreshSpanIterate(br *BatchResponse, fn func(roachpb.Sp
}
} else {
// Otherwise, call the function with the span which was operated on.
if span, ok := ActualSpan(req, resp); ok {
if span, ok := actualSpan(req, resp); ok {
fn(span)
}
}
}
return nil
}

// ActualSpan returns the actual request span which was operated on,
// actualSpan returns the actual request span which was operated on,
// according to the existence of a resume span in the response. If
// nothing was operated on, returns false.
func ActualSpan(req Request, resp Response) (roachpb.Span, bool) {
func actualSpan(req Request, resp Response) (roachpb.Span, bool) {
h := req.Header()
if resp != nil {
resumeSpan := resp.Header().ResumeSpan
Expand All @@ -567,6 +589,25 @@ func ActualSpan(req Request, resp Response) (roachpb.Span, bool) {
return h.Span(), true
}

// canIterateResponseKeys returns whether the response to the given request
// contains keys that can be iterated over using ResponseKeyIterate.
func canIterateResponseKeys(req Request, resp Response) bool {
if resp == nil {
return false
}
switch v := req.(type) {
case *GetRequest:
return true
case *ScanRequest:
return v.ScanFormat != COL_BATCH_RESPONSE
case *ReverseScanRequest:
return v.ScanFormat != COL_BATCH_RESPONSE
case *DeleteRangeRequest:
return v.ReturnKeys
}
return false
}

// ResponseKeyIterate calls the passed function with the keys returned
// in the provided request's response. If no keys are being returned,
// the function will not be called.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvpb/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -224,6 +225,7 @@ func TestBatchRequestSummary(t *testing.T) {
}

func TestLockSpanIterate(t *testing.T) {
skip.IgnoreLint(t, "WIP")
type testReq struct {
req Request
resp Response
Expand Down Expand Up @@ -261,7 +263,8 @@ func TestLockSpanIterate(t *testing.T) {
fn := func(span roachpb.Span, dur lock.Durability) {
spans[dur] = append(spans[dur], span)
}
ba.LockSpanIterate(&br, fn)
err := ba.LockSpanIterate(&br, fn)
require.NoError(t, err)

toExpSpans := func(trs ...testReq) []roachpb.Span {
exp := make([]roachpb.Span, len(trs))
Expand Down Expand Up @@ -534,6 +537,7 @@ func TestResponseKeyIterate(t *testing.T) {
require.Error(t, err)
require.Regexp(t, tc.expErr, err)
}
require.Equal(t, err == nil, canIterateResponseKeys(tc.req, tc.resp))
})
}
}
Expand Down

0 comments on commit 1136532

Please sign in to comment.