Skip to content

Commit

Permalink
kv: batch ranged intent resolution
Browse files Browse the repository at this point in the history
Fixes #46752.
Resolves the recent perf regression on TPC-C.

This commit follows in the footsteps of #34803 and introduces batching for
ranged intent resolution, where previously only point intent resolution was
batched. As we found in #46752, this is more important than it has been in
the past, because implicit SELECT FOR UPDATE acquires a ranged lock on each
row that it updates instead of a single-key lock.

The change addresses this by adding a third request batcher to IntentResolver.
ResolveIntent requests and ResolveIntentRange requests are batched separately,
which is necessary for the use of MaxSpanRequestKeys to work properly (in fact,
to be accepted by DistSender at all).

To accommodate the ranged nature of ResolveIntentRange request, the notion of
pagination is introduced into RequestBatcher. A MaxKeysPerBatchReq option is
added to the configuration of a RequestBatcher, which corresponds to the
MaxSpanRequestKeys value set on each BatchRequest header. The RequestBatcher is
then taught about request pagination and how to work with partial responses. See
the previous commit for clarification about the semantics at play here.

Release justification: important fix to avoid a performance regression when
implicit SELECT FOR UPDATE is enabled.
  • Loading branch information
nvanbenschoten committed Apr 4, 2020
1 parent 5ec87f1 commit cf11645
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 257 deletions.
78 changes: 67 additions & 11 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand Down Expand Up @@ -109,6 +110,13 @@ type Config struct {
// If MaxMsgsPerBatch <= 0 then no limit is enforced.
MaxMsgsPerBatch int

// MaxKeysPerBatchReq is the maximum number of keys that each batch is
// allowed to touch during one of its requests. If the limit is exceeded,
// the batch is paginated over a series of individual requests. This limit
// corresponds to the MaxSpanRequestKeys assigned to the Header of each
// request. If MaxKeysPerBatchReq <= 0 then no limit is enforced.
MaxKeysPerBatchReq int

// MaxWait is the maximum amount of time a message should wait in a batch
// before being sent. If MaxWait is <= 0 then no wait timeout is enforced.
// It is inadvisable to disable both MaxIdle and MaxWait.
Expand Down Expand Up @@ -264,7 +272,7 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
var br *roachpb.BatchResponse
send := func(ctx context.Context) error {
var pErr *roachpb.Error
if br, pErr = b.cfg.Sender.Send(ctx, ba.batchRequest()); pErr != nil {
if br, pErr = b.cfg.Sender.Send(ctx, ba.batchRequest(&b.cfg)); pErr != nil {
return pErr.GoError()
}
return nil
Expand All @@ -276,16 +284,61 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
ctx, b.sendBatchOpName, timeutil.Until(ba.sendDeadline), actualSend)
}
}
err := send(ctx)
for i, r := range ba.reqs {
res := Response{}
if br != nil && i < len(br.Responses) {
res.Resp = br.Responses[i].GetInner()
}
if err != nil {
res.Err = err
// Send requests in a loop to support pagination, which may be necessary
// if MaxKeysPerBatchReq is set. If so, partial responses with resume
// spans may be returned for requests, indicating that the limit was hit
// before they could complete and that they should be resumed over the
// specified key span. Requests in the batch are neither guaranteed to
// be ordered nor guaranteed to be non-overlapping, so we can make no
// assumptions about the requests that will result in full responses
// (with no resume spans) vs. partial responses vs. empty responses (see
// the comment on roachpb.Header.MaxSpanRequestKeys).
//
// To accommodate this, we keep track of all partial responses from
// previous iterations. After receiving a batch of responses during an
// iteration, the responses are each combined with the previous response
// for their corresponding requests. From there, responses that have no
// resume spans are removed. Responses that have resume spans are
// updated appropriately and sent again in the next iteration. The loop
// proceeds until all requests have been run to completion.
var prevResps []roachpb.Response
for len(ba.reqs) > 0 {
err := send(ctx)
nextReqs, nextPrevResps := ba.reqs[:0], prevResps[:0]
for i, r := range ba.reqs {
var res Response
if br != nil {
resp := br.Responses[i].GetInner()
if prevResps != nil {
prevResp := prevResps[i]
if cErr := roachpb.CombineResponses(prevResp, resp); cErr != nil {
log.Fatal(ctx, cErr)
}
resp = prevResp
}
if resume := resp.Header().ResumeSpan; resume != nil {
// Add a trimmed request to the next batch.
h := r.req.Header()
h.SetSpan(*resume)
r.req = r.req.ShallowCopy()
r.req.SetHeader(h)
nextReqs = append(nextReqs, r)
// Strip resume span from previous response and record.
prevH := resp.Header()
prevH.ResumeSpan = nil
prevResp := resp
prevResp.SetHeader(prevH)
nextPrevResps = append(nextPrevResps, prevResp)
continue
}
res.Resp = resp
}
if err != nil {
res.Err = err
}
b.sendResponse(r, res)
}
b.sendResponse(r, res)
ba.reqs, prevResps = nextReqs, nextPrevResps
}
})
}
Expand Down Expand Up @@ -462,14 +515,17 @@ func (b *batch) rangeID() roachpb.RangeID {
return b.reqs[0].rangeID
}

func (b *batch) batchRequest() roachpb.BatchRequest {
func (b *batch) batchRequest(cfg *Config) roachpb.BatchRequest {
req := roachpb.BatchRequest{
// Preallocate the Requests slice.
Requests: make([]roachpb.RequestUnion, 0, len(b.reqs)),
}
for _, r := range b.reqs {
req.Add(r.req)
}
if cfg.MaxKeysPerBatchReq > 0 {
req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq)
}
return req
}

Expand Down
184 changes: 149 additions & 35 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
)

type batchResp struct {
// TODO(ajwerner): we never actually test that this result is what we expect
// it to be. We should add a test that does so.
br *roachpb.BatchResponse
pe *roachpb.Error
}
Expand Down Expand Up @@ -58,6 +60,22 @@ func (c chanSender) Send(
}
}

type senderGroup struct {
b *RequestBatcher
g errgroup.Group
}

func (g *senderGroup) Send(rangeID roachpb.RangeID, request roachpb.Request) {
g.g.Go(func() error {
_, err := g.b.Send(context.Background(), rangeID, request)
return err
})
}

func (g *senderGroup) Wait() error {
return g.g.Wait()
}

func TestBatcherSendOnSizeWithReset(t *testing.T) {
// This test ensures that when a single batch ends up sending due to size
// constrains its timer is successfully canceled and does not lead to a
Expand All @@ -84,15 +102,9 @@ func TestBatcherSendOnSizeWithReset(t *testing.T) {
Sender: sc,
Stopper: stopper,
})
var g errgroup.Group
sendRequest := func(rangeID roachpb.RangeID, request roachpb.Request) {
g.Go(func() error {
_, err := b.Send(context.Background(), rangeID, request)
return err
})
}
sendRequest(1, &roachpb.GetRequest{})
sendRequest(1, &roachpb.GetRequest{})
g := senderGroup{b: b}
g.Send(1, &roachpb.GetRequest{})
g.Send(1, &roachpb.GetRequest{})
s := <-sc
s.respChan <- batchResp{}
// See the comment above wait. In rare cases the batch will be sent before the
Expand Down Expand Up @@ -212,22 +224,16 @@ func TestBatcherSend(t *testing.T) {
Sender: sc,
Stopper: stopper,
})
var g errgroup.Group
sendRequest := func(rangeID roachpb.RangeID, request roachpb.Request) {
g.Go(func() error {
_, err := b.Send(context.Background(), rangeID, request)
return err
})
}
// Send 3 requests to range 2 and 2 to range 1.
// The 3rd range 2 request will trigger immediate sending due to the
// MaxMsgsPerBatch configuration. The range 1 batch will be sent after the
// MaxWait timeout expires.
sendRequest(1, &roachpb.GetRequest{})
sendRequest(2, &roachpb.GetRequest{})
sendRequest(1, &roachpb.GetRequest{})
sendRequest(2, &roachpb.GetRequest{})
sendRequest(2, &roachpb.GetRequest{})
g := senderGroup{b: b}
g.Send(1, &roachpb.GetRequest{})
g.Send(2, &roachpb.GetRequest{})
g.Send(1, &roachpb.GetRequest{})
g.Send(2, &roachpb.GetRequest{})
g.Send(2, &roachpb.GetRequest{})
// Wait for the range 2 request and ensure it contains 3 requests.
s := <-sc
assert.Len(t, s.ba.Requests, 3)
Expand Down Expand Up @@ -305,7 +311,7 @@ func TestPanicWithNilStopper(t *testing.T) {
New(Config{Sender: make(chanSender)})
}

// TestBatchTimeout verfies the the RequestBatcher uses the context with the
// TestBatchTimeout verifies the the RequestBatcher uses the context with the
// deadline from the latest call to send.
func TestBatchTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -405,24 +411,17 @@ func TestIdleAndMaxTimeoutDisabled(t *testing.T) {
Sender: sc,
Stopper: stopper,
})
var g errgroup.Group
sendRequest := func(rangeID roachpb.RangeID, request roachpb.Request) {
g.Go(func() error {
_, err := b.Send(context.Background(), rangeID, request)
return err
})
}
// Send 3 requests to range 2 and 2 to range 1.
// The 3rd range 2 request will trigger immediate sending due to the
// MaxMsgsPerBatch configuration. The range 1 batch will be sent after the
// MaxWait timeout expires.
sendRequest(1, &roachpb.GetRequest{})
// Send 2 requests to range 1. Even with an arbitrarily large delay between
// the requests, they should only be sent when the MaxMsgsPerBatch limit is
// reached, because no MaxWait timeout is configured.
g := senderGroup{b: b}
g.Send(1, &roachpb.GetRequest{})
select {
case <-sc:
t.Fatalf("RequestBatcher should not sent based on time")
case <-time.After(10 * time.Millisecond):
}
sendRequest(1, &roachpb.GetRequest{})
g.Send(1, &roachpb.GetRequest{})
s := <-sc
assert.Len(t, s.ba.Requests, 2)
s.respChan <- batchResp{}
Expand All @@ -432,6 +431,121 @@ func TestIdleAndMaxTimeoutDisabled(t *testing.T) {
}
}

// TestMaxKeysPerBatchReq exercises the RequestBatcher when it is configured to
// assign each request a MaxSpanRequestKeys limit. When such a limit is used,
// the RequestBatcher may receive partial responses to the requests that it
// issues, so it needs to be prepared to paginate requests and combine partial
// responses.
func TestMaxKeysPerBatchReq(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
b := New(Config{
MaxMsgsPerBatch: 3,
MaxKeysPerBatchReq: 5,
Sender: sc,
Stopper: stopper,
})
// Send 3 ResolveIntentRange requests. The requests are limited so
// pagination will be required. The following sequence of partial
// results will be returned:
// send([{d-g}, {a-d}, {b, m}]) ->
// scans from [a, c) before hitting limit
// returns [{d-g}, {c-d}, {c-m}]
// send([{d-g}, {c-d}, {c-m}]) ->
// scans from [c, e) before hitting limit
// returns [{e-g}, {}, {e-m}]
// send([{e-g}, {e-m}]) ->
// scans from [e, h) before hitting limit
// returns [{}, {h-m}]
// send([{h-m}]) ->
// scans from [h, m) without hitting limit
// returns [{}]
type span [2]string // [key, endKey]
type spanMap map[span]span
var nilResumeSpan span
makeReq := func(sp span) *roachpb.ResolveIntentRangeRequest {
var req roachpb.ResolveIntentRangeRequest
req.Key = roachpb.Key(sp[0])
req.EndKey = roachpb.Key(sp[1])
return &req
}
makeResp := func(ba *roachpb.BatchRequest, resumeSpans spanMap) *roachpb.BatchResponse {
br := ba.CreateReply()
for i, ru := range ba.Requests {
req := ru.GetResolveIntentRange()
reqSp := span{string(req.Key), string(req.EndKey)}
resumeSp, ok := resumeSpans[reqSp]
if !ok {
t.Fatalf("unexpected request: %+v", req)
}
if resumeSp == nilResumeSpan {
continue
}
resp := br.Responses[i].GetResolveIntentRange()
resp.ResumeSpan = &roachpb.Span{
Key: roachpb.Key(resumeSp[0]), EndKey: roachpb.Key(resumeSp[1]),
}
resp.ResumeReason = roachpb.RESUME_KEY_LIMIT
}
return br
}
g := senderGroup{b: b}
g.Send(1, makeReq(span{"d", "g"}))
g.Send(1, makeReq(span{"a", "d"}))
g.Send(1, makeReq(span{"b", "m"}))
// send([{d-g}, {a-d}, {b, m}]) ->
// scans from [a, c) before hitting limit
// returns [{d-g}, {c-d}, {c-m}]
s := <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 3)
br := makeResp(&s.ba, spanMap{
{"d", "g"}: {"d", "g"},
{"a", "d"}: {"c", "d"},
{"b", "m"}: {"c", "m"},
})
s.respChan <- batchResp{br: br}
// send([{d-g}, {c-d}, {c-m}]) ->
// scans from [c, e) before hitting limit
// returns [{e-g}, {}, {e-m}]
s = <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 3)
br = makeResp(&s.ba, spanMap{
{"d", "g"}: {"e", "g"},
{"c", "d"}: nilResumeSpan,
{"c", "m"}: {"e", "m"},
})
s.respChan <- batchResp{br: br}
// send([{e-g}, {e-m}]) ->
// scans from [e, h) before hitting limit
// returns [{}, {h-m}]
s = <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 2)
br = makeResp(&s.ba, spanMap{
{"e", "g"}: nilResumeSpan,
{"e", "m"}: {"h", "m"},
})
s.respChan <- batchResp{br: br}
// send([{h-m}]) ->
// scans from [h, m) without hitting limit
// returns [{}]
s = <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 1)
br = makeResp(&s.ba, spanMap{
{"h", "m"}: nilResumeSpan,
})
s.respChan <- batchResp{br: br}
// Make sure everything gets a response.
if err := g.Wait(); err != nil {
t.Fatalf("expected no errors, got %v", err)
}
}

func TestPanicWithNilSender(t *testing.T) {
defer leaktest.AfterTest(t)()
defer func() {
Expand Down
Loading

0 comments on commit cf11645

Please sign in to comment.