Skip to content

Commit

Permalink
intentresolver: Add byte pagination to RequestBatcher and IntentResolver
Browse files Browse the repository at this point in the history
Informs: cockroachdb#77228

Intent resolution batches are sequenced on raft and each batch can
consist of 100-200 intents. If an intent key or even value in some cases
are large, it is possible that resolving all intents in the batch would
result in a raft command size exceeding the max raft command size
kv.raft.command.max_size.

In PR cockroachdb#94814, we added support for TargetBytes for resolve intent and
resolve intent range raft commands.

In this PR, we set the TargetBytes field in the resolve intent and
resolve intent range requests in the RequestBatcher and IntentResolver
to 4MB. This allows us to stop resolving intents in the batch before we
get close to the max raft command size to lower the chance we exceed
this limit.

Release note: None
  • Loading branch information
KaiSun314 committed Feb 22, 2023
1 parent e9c16ff commit 7e926ab
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/internal/client/requestbatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)
Expand Down
24 changes: 16 additions & 8 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ type Config struct {
// request. If MaxKeysPerBatchReq <= 0 then no limit is enforced.
MaxKeysPerBatchReq int

// TargetBytesPerBatchReq is the desired TargetBytes assigned to the Header
// of each batch request. If TargetBytesPerBatchReq <= 0, then no TargetBytes
// is enforced.
TargetBytesPerBatchReq int64

// MaxWait is the maximum amount of time a message should wait in a batch
// before being sent. If MaxWait is <= 0 then no wait timeout is enforced.
// It is inadvisable to disable both MaxIdle and MaxWait.
Expand Down Expand Up @@ -289,14 +294,14 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
}
}
// Send requests in a loop to support pagination, which may be necessary
// if MaxKeysPerBatchReq is set. If so, partial responses with resume
// spans may be returned for requests, indicating that the limit was hit
// before they could complete and that they should be resumed over the
// specified key span. Requests in the batch are neither guaranteed to
// be ordered nor guaranteed to be non-overlapping, so we can make no
// assumptions about the requests that will result in full responses
// (with no resume spans) vs. partial responses vs. empty responses (see
// the comment on kvpb.Header.MaxSpanRequestKeys).
// if MaxKeysPerBatchReq or TargetBytesPerBatchReq is set. If so, partial
// responses with resume spans may be returned for requests, indicating
// that the limit was hit before they could complete and that they should
// be resumed over the specified key span. Requests in the batch are
// neither guaranteed to be ordered nor guaranteed to be non-overlapping,
// so we can make no assumptions about the requests that will result in
// full responses (with no resume spans) vs. partial responses vs. empty
// responses (see the comment on kvpb.Header.MaxSpanRequestKeys).
//
// To accommodate this, we keep track of all partial responses from
// previous iterations. After receiving a batch of responses during an
Expand Down Expand Up @@ -538,6 +543,9 @@ func (b *batch) batchRequest(cfg *Config) *kvpb.BatchRequest {
if cfg.MaxKeysPerBatchReq > 0 {
req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq)
}
if cfg.TargetBytesPerBatchReq > 0 {
req.TargetBytes = cfg.TargetBytesPerBatchReq
}
return req
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -547,6 +548,30 @@ func TestMaxKeysPerBatchReq(t *testing.T) {
}
}

// TestTargetBytesPerBatchReq checks that the correct TargetBytes limit is set
// according to the TargetBytesPerBatchReqFn passed in via the config.
func TestTargetBytesPerBatchReq(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
b := New(Config{
// MaxMsgsPerBatch of 1 is chosen so that the first call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
TargetBytesPerBatchReq: 4 << 20,
})
respChan := make(chan Response, 1)

err := b.SendWithChan(context.Background(), respChan, 1, &roachpb.GetRequest{})
require.NoError(t, err)
s := <-sc
assert.Equal(t, int64(4<<20), s.ba.TargetBytes)
s.respChan <- batchResp{}
}

func TestPanicWithNilSender(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ
foundReverse = true

case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest,
*kvpb.GetRequest, *kvpb.DeleteRequest:
*kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest:
// Accepted point requests that can be in batches with limit.

default:
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,19 @@ func (dr *RevertRangeResponse) combine(c combinable) error {

var _ combinable = &RevertRangeResponse{}

// combine implements the combinable interface.
func (rr *ResolveIntentResponse) combine(c combinable) error {
otherRR := c.(*ResolveIntentResponse)
if rr != nil {
if err := rr.ResponseHeader.combine(otherRR.Header()); err != nil {
return err
}
}
return nil
}

var _ combinable = &ResolveIntentResponse{}

// combine implements the combinable interface.
func (rr *ResolveIntentRangeResponse) combine(c combinable) error {
otherRR := c.(*ResolveIntentRangeResponse)
Expand Down
16 changes: 15 additions & 1 deletion pkg/kv/kvserver/intentresolver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,39 @@ go_library(
go_test(
name = "intentresolver_test",
size = "small",
srcs = ["intent_resolver_test.go"],
srcs = [
"intent_resolver_integration_test.go",
"intent_resolver_test.go",
"main_test.go",
],
args = ["-test.timeout=55s"],
embed = [":intentresolver"],
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
39 changes: 24 additions & 15 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
// ResumeSpan and the batcher will send a new range request.
intentResolverRangeRequestSize = 200

// intentResolverRequestTargetBytes is the target number of bytes of the
// write batch resulting from an intent resolution request. When exceeded,
// the response will include a ResumeSpan and the batcher will send a new
// intent resolution request.
intentResolverRequestTargetBytes = 4 << 20 // 4MB.

// MaxTxnsPerIntentCleanupBatch is the number of transactions whose
// corresponding intents will be resolved at a time. Intents are batched
// by transaction to avoid timeouts while resolving intents and ensure that
Expand Down Expand Up @@ -226,24 +232,27 @@ func New(c Config) *IntentResolver {
intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
}
targetBytesPerBatchReq := int64(intentResolverRequestTargetBytes)
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: intentResolutionBatchSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: intentResolutionBatchSize,
TargetBytesPerBatchReq: targetBytesPerBatchReq,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_range_batcher",
MaxMsgsPerBatch: intentResolutionRangeBatchSize,
MaxKeysPerBatchReq: intentResolverRangeRequestSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_range_batcher",
MaxMsgsPerBatch: intentResolutionRangeBatchSize,
MaxKeysPerBatchReq: intentResolverRangeRequestSize,
TargetBytesPerBatchReq: targetBytesPerBatchReq,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
return ir
}
Expand Down
150 changes: 150 additions & 0 deletions pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package intentresolver_test

import (
"context"
gosql "database/sql"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func getRangeInfoForTable(
ctx context.Context, t *testing.T, db *gosql.DB, servers []*server.TestServer, tableName string,
) (startKey, endKey roachpb.Key, store *kvserver.Store) {
var rangeID roachpb.RangeID
err := db.QueryRow(fmt.Sprintf("select range_id from [show ranges from table %s] limit 1", tableName)).Scan(&rangeID)
require.NoError(t, err)
for _, server := range servers {
require.NoError(t, server.Stores().VisitStores(func(s *kvserver.Store) error {
if replica, err := s.GetReplica(rangeID); err == nil && replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) {
desc := replica.Desc()
startKey = desc.StartKey.AsRawKey()
endKey = desc.EndKey.AsRawKey()
store = s
}
return nil
}))
}
return startKey, endKey, store
}

// TestAsyncIntentResolutionByteSizePagination tests that async intent
// resolution through the IntentResolver has byte size pagination. This is done
// by creating a transaction that first writes to a range (transaction record)
// and then in another range: writes such that the total bytes of the write
// values exceeds the max raft command size and updating the transaction
// timestamp to ensure the key values are written to the raft command during
// intent resolution. The latter intents will be resolved asynchronously in the
// IntentResolver, but the write batch size from intent resolution will exceed
// the max raft command size resulting in an error and not all intents will be
// resolved, unless byte size pagination is implemented.
func TestAsyncIntentResolutionByteSizePagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// Start test cluster.
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)
const numIntents = 7

// Create table t and split into two ranges, the first range consists of
// primary key < numIntents and second range consists of primary key >=
// numIntents.
{
_, err := db.Exec("CREATE TABLE t (i INT PRIMARY KEY, j STRING)")
require.NoError(t, err)
_, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES (%d)", numIntents))
require.NoError(t, err)
}

// Set the max raft command size to 5MB.
st := tc.Servers[0].ClusterSettings()
st.Manual.Store(true)
kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 5<<20)

{
// Create the first transaction.
tx, err := db.Begin()
require.NoError(t, err)

// Create transaction record on the second range of t to ensure intents
// for t on the first range are resolved asynchronously.
_, err = tx.Exec(fmt.Sprintf("INSERT INTO t (i, j) VALUES (%d, '0')", numIntents))
require.NoError(t, err)

// Insert kv pairs whose values exceed max raft command size = 5MB in
// total. This will be inserted into t on the first range.
for i := 0; i < numIntents-1; i++ {
_, err = tx.Exec(fmt.Sprintf("INSERT INTO t(i, j) VALUES (%d, '%01000000d')", i, i))
require.NoError(t, err)
}

// Create a later transaction that writes to key numIntents-1. This
// will be inserted into t on the first range.
{
tx2, err := db.Begin()
require.NoError(t, err)
_, err = tx2.Exec(fmt.Sprintf("INSERT INTO t (i, j) VALUES (%d, '0')", numIntents-1))
require.NoError(t, err)
err = tx2.Commit()
require.NoError(t, err)
}

// Have the first transaction write to key numIntents-1, which will
// force the transaction to update its timestamp.
_, err = tx.Exec(fmt.Sprintf("UPDATE t SET j = '1' WHERE i = %d", numIntents-1))
require.NoError(t, err)

// Commit, which will asynchronously resolve the intents for t, and the
// write batch size from intent resolution will exceed the max raft
// command size resulting in an error and not all intents will be
// resolved, unless byte size pagination is implemented. Below, we
// check that all intents have been resolved.
err = tx.Commit()
require.NoError(t, err)
}

// Get the store, start key, and end key of the range containing table t.
startKey, endKey, store := getRangeInfoForTable(ctx, t, db, tc.Servers, "t")

// Check that all intents have been resolved to ensure async intent
// resolution did not exceed the max raft command size, which can only
// happen if byte size pagination was implemented.
testutils.SucceedsSoon(t, func() error {
result, err := storage.MVCCScanToBytes(ctx, store.TODOEngine(), startKey, endKey,
hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true})
if err != nil {
return err
}
if intentCount := len(result.Intents); intentCount != 0 {
return errors.Errorf("%d intents still unresolved", intentCount)
}
return nil
})
}
Loading

0 comments on commit 7e926ab

Please sign in to comment.