Skip to content

Commit

Permalink
intentresolver: Add byte pagination to RequestBatcher and IntentResolver
Browse files Browse the repository at this point in the history
Informs: cockroachdb#77228

Intent resolution batches are sequenced on raft and each batch can
consist of 100-200 intents. If an intent key or even value in some cases
are large, it is possible that resolving all intents in the batch would
result in a raft command size exceeding the max raft command size
kv.raft.command.max_size.

In PR cockroachdb#94814, we added support for TargetBytes for resolve intent and
resolve intent range raft commands.

In this PR, we set the TargetBytes field in the resolve intent and
resolve intent range requests in the RequestBatcher and IntentResolver
to 4MB. This allows us to stop resolving intents in the batch before we
get close to the max raft command size to lower the chance we exceed
this limit.

Release note: None
  • Loading branch information
KaiSun314 committed Jan 30, 2023
1 parent 10ef5d9 commit 37d01c3
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/internal/client/requestbatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)
Expand Down
30 changes: 21 additions & 9 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type Config struct {
// request. If MaxKeysPerBatchReq <= 0 then no limit is enforced.
MaxKeysPerBatchReq int

// TargetBytesPerBatchReq is the desired TargetBytes assigned to the Header
// of each batch request. If TargetBytesPerBatchReq <= 0, then no TargetBytes
// is enforced.
TargetBytesPerBatchReq int64

// MaxWait is the maximum amount of time a message should wait in a batch
// before being sent. If MaxWait is <= 0 then no wait timeout is enforced.
// It is inadvisable to disable both MaxIdle and MaxWait.
Expand Down Expand Up @@ -288,14 +293,14 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
}
}
// Send requests in a loop to support pagination, which may be necessary
// if MaxKeysPerBatchReq is set. If so, partial responses with resume
// spans may be returned for requests, indicating that the limit was hit
// before they could complete and that they should be resumed over the
// specified key span. Requests in the batch are neither guaranteed to
// be ordered nor guaranteed to be non-overlapping, so we can make no
// assumptions about the requests that will result in full responses
// (with no resume spans) vs. partial responses vs. empty responses (see
// the comment on roachpb.Header.MaxSpanRequestKeys).
// if MaxKeysPerBatchReq or TargetBytesPerBatchReq is set. If so, partial
// responses with resume spans may be returned for requests, indicating
// that the limit was hit before they could complete and that they should
// be resumed over the specified key span. Requests in the batch are
// neither guaranteed to be ordered nor guaranteed to be non-overlapping,
// so we can make no assumptions about the requests that will result in
// full responses (with no resume spans) vs. partial responses vs. empty
// responses (see the comment on roachpb.Header.MaxSpanRequestKeys).
//
// To accommodate this, we keep track of all partial responses from
// previous iterations. After receiving a batch of responses during an
Expand All @@ -312,14 +317,18 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
var res Response
if br != nil {
resp := br.Responses[i].GetInner()
// For response types that do not implement the combinable interface,
// the resume span will be lost after roachpb.CombineResponses and
// resp = prevResp below, so extract the resume span now.
resume := resp.Header().ResumeSpan
if prevResps != nil {
prevResp := prevResps[i]
if cErr := roachpb.CombineResponses(prevResp, resp); cErr != nil {
log.Fatalf(ctx, "%v", cErr)
}
resp = prevResp
}
if resume := resp.Header().ResumeSpan; resume != nil {
if resume != nil {
// Add a trimmed request to the next batch.
h := r.req.Header()
h.SetSpan(*resume)
Expand Down Expand Up @@ -537,6 +546,9 @@ func (b *batch) batchRequest(cfg *Config) *roachpb.BatchRequest {
if cfg.MaxKeysPerBatchReq > 0 {
req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq)
}
if cfg.TargetBytesPerBatchReq > 0 {
req.TargetBytes = cfg.TargetBytesPerBatchReq
}
return req
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -546,6 +547,30 @@ func TestMaxKeysPerBatchReq(t *testing.T) {
}
}

// TestTargetBytesPerBatchReq checks that the correct TargetBytes limit is set
// according to the TargetBytesPerBatchReqFn passed in via the config.
func TestTargetBytesPerBatchReq(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
b := New(Config{
// MaxMsgsPerBatch of 1 is chosen so that the first call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
TargetBytesPerBatchReq: 4 << 20,
})
respChan := make(chan Response, 1)

err := b.SendWithChan(context.Background(), respChan, 1, &roachpb.GetRequest{})
require.NoError(t, err)
s := <-sc
assert.Equal(t, int64(4<<20), s.ba.TargetBytes)
s.respChan <- batchResp{}
}

func TestPanicWithNilSender(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (ds *DistSender) initAndVerifyBatch(
foundReverse = true

case *roachpb.QueryIntentRequest, *roachpb.EndTxnRequest,
*roachpb.GetRequest, *roachpb.DeleteRequest:
*roachpb.GetRequest, *roachpb.ResolveIntentRequest, *roachpb.DeleteRequest:
// Accepted point requests that can be in batches with limit.

default:
Expand Down
16 changes: 15 additions & 1 deletion pkg/kv/kvserver/intentresolver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,38 @@ go_library(
go_test(
name = "intentresolver_test",
size = "small",
srcs = ["intent_resolver_test.go"],
srcs = [
"intent_resolver_integration_test.go",
"intent_resolver_test.go",
"main_test.go",
],
args = ["-test.timeout=55s"],
embed = [":intentresolver"],
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
39 changes: 24 additions & 15 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ const (
// ResumeSpan and the batcher will send a new range request.
intentResolverRangeRequestSize = 200

// intentResolverRequestTargetBytes is the target number of bytes of the
// write batch resulting from an intent resolution request. When exceeded,
// the response will include a ResumeSpan and the batcher will send a new
// intent resolution request.
intentResolverRequestTargetBytes = 4 << 20 // 4MB.

// MaxTxnsPerIntentCleanupBatch is the number of transactions whose
// corresponding intents will be resolved at a time. Intents are batched
// by transaction to avoid timeouts while resolving intents and ensure that
Expand Down Expand Up @@ -225,24 +231,27 @@ func New(c Config) *IntentResolver {
intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
}
targetBytesPerBatchReq := int64(intentResolverRequestTargetBytes)
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: intentResolutionBatchSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: intentResolutionBatchSize,
TargetBytesPerBatchReq: targetBytesPerBatchReq,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_range_batcher",
MaxMsgsPerBatch: intentResolutionRangeBatchSize,
MaxKeysPerBatchReq: intentResolverRangeRequestSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_range_batcher",
MaxMsgsPerBatch: intentResolutionRangeBatchSize,
MaxKeysPerBatchReq: intentResolverRangeRequestSize,
TargetBytesPerBatchReq: targetBytesPerBatchReq,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
return ir
}
Expand Down
Loading

0 comments on commit 37d01c3

Please sign in to comment.