From 37d01c39a912b0fe4082382493f538d6daebd486 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Wed, 11 Jan 2023 22:27:18 -0500 Subject: [PATCH] intentresolver: Add byte pagination to RequestBatcher and IntentResolver Informs: #77228 Intent resolution batches are sequenced on raft and each batch can consist of 100-200 intents. If an intent key or even value in some cases are large, it is possible that resolving all intents in the batch would result in a raft command size exceeding the max raft command size kv.raft.command.max_size. In PR #94814, we added support for TargetBytes for resolve intent and resolve intent range raft commands. In this PR, we set the TargetBytes field in the resolve intent and resolve intent range requests in the RequestBatcher and IntentResolver to 4MB. This allows us to stop resolving intents in the batch before we get close to the max raft command size to lower the chance we exceed this limit. Release note: None --- .../client/requestbatcher/BUILD.bazel | 1 + pkg/internal/client/requestbatcher/batcher.go | 30 ++- .../client/requestbatcher/batcher_test.go | 25 +++ pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- pkg/kv/kvserver/intentresolver/BUILD.bazel | 16 +- .../intentresolver/intent_resolver.go | 39 ++-- .../intent_resolver_integration_test.go | 194 ++++++++++++++++++ pkg/kv/kvserver/intentresolver/main_test.go | 32 +++ 8 files changed, 313 insertions(+), 26 deletions(-) create mode 100644 pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go create mode 100644 pkg/kv/kvserver/intentresolver/main_test.go diff --git a/pkg/internal/client/requestbatcher/BUILD.bazel b/pkg/internal/client/requestbatcher/BUILD.bazel index dac8835030d9..4c36e3bfd611 100644 --- a/pkg/internal/client/requestbatcher/BUILD.bazel +++ b/pkg/internal/client/requestbatcher/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index 90012e3616a2..ce9ccef9f20c 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -119,6 +119,11 @@ type Config struct { // request. If MaxKeysPerBatchReq <= 0 then no limit is enforced. MaxKeysPerBatchReq int + // TargetBytesPerBatchReq is the desired TargetBytes assigned to the Header + // of each batch request. If TargetBytesPerBatchReq <= 0, then no TargetBytes + // is enforced. + TargetBytesPerBatchReq int64 + // MaxWait is the maximum amount of time a message should wait in a batch // before being sent. If MaxWait is <= 0 then no wait timeout is enforced. // It is inadvisable to disable both MaxIdle and MaxWait. @@ -288,14 +293,14 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } } // Send requests in a loop to support pagination, which may be necessary - // if MaxKeysPerBatchReq is set. If so, partial responses with resume - // spans may be returned for requests, indicating that the limit was hit - // before they could complete and that they should be resumed over the - // specified key span. Requests in the batch are neither guaranteed to - // be ordered nor guaranteed to be non-overlapping, so we can make no - // assumptions about the requests that will result in full responses - // (with no resume spans) vs. partial responses vs. empty responses (see - // the comment on roachpb.Header.MaxSpanRequestKeys). + // if MaxKeysPerBatchReq or TargetBytesPerBatchReq is set. If so, partial + // responses with resume spans may be returned for requests, indicating + // that the limit was hit before they could complete and that they should + // be resumed over the specified key span. Requests in the batch are + // neither guaranteed to be ordered nor guaranteed to be non-overlapping, + // so we can make no assumptions about the requests that will result in + // full responses (with no resume spans) vs. partial responses vs. empty + // responses (see the comment on roachpb.Header.MaxSpanRequestKeys). // // To accommodate this, we keep track of all partial responses from // previous iterations. After receiving a batch of responses during an @@ -312,6 +317,10 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { var res Response if br != nil { resp := br.Responses[i].GetInner() + // For response types that do not implement the combinable interface, + // the resume span will be lost after roachpb.CombineResponses and + // resp = prevResp below, so extract the resume span now. + resume := resp.Header().ResumeSpan if prevResps != nil { prevResp := prevResps[i] if cErr := roachpb.CombineResponses(prevResp, resp); cErr != nil { @@ -319,7 +328,7 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } resp = prevResp } - if resume := resp.Header().ResumeSpan; resume != nil { + if resume != nil { // Add a trimmed request to the next batch. h := r.req.Header() h.SetSpan(*resume) @@ -537,6 +546,9 @@ func (b *batch) batchRequest(cfg *Config) *roachpb.BatchRequest { if cfg.MaxKeysPerBatchReq > 0 { req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq) } + if cfg.TargetBytesPerBatchReq > 0 { + req.TargetBytes = cfg.TargetBytesPerBatchReq + } return req } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 2ab6be9b40b2..945566faab60 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -546,6 +547,30 @@ func TestMaxKeysPerBatchReq(t *testing.T) { } } +// TestTargetBytesPerBatchReq checks that the correct TargetBytes limit is set +// according to the TargetBytesPerBatchReqFn passed in via the config. +func TestTargetBytesPerBatchReq(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + sc := make(chanSender) + b := New(Config{ + // MaxMsgsPerBatch of 1 is chosen so that the first call to Send will + // immediately lead to a batch being sent. + MaxMsgsPerBatch: 1, + Sender: sc, + Stopper: stopper, + TargetBytesPerBatchReq: 4 << 20, + }) + respChan := make(chan Response, 1) + + err := b.SendWithChan(context.Background(), respChan, 1, &roachpb.GetRequest{}) + require.NoError(t, err) + s := <-sc + assert.Equal(t, int64(4<<20), s.ba.TargetBytes) + s.respChan <- batchResp{} +} + func TestPanicWithNilSender(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 2eb47402abfa..8b3ab274c708 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -701,7 +701,7 @@ func (ds *DistSender) initAndVerifyBatch( foundReverse = true case *roachpb.QueryIntentRequest, *roachpb.EndTxnRequest, - *roachpb.GetRequest, *roachpb.DeleteRequest: + *roachpb.GetRequest, *roachpb.ResolveIntentRequest, *roachpb.DeleteRequest: // Accepted point requests that can be in batches with limit. default: diff --git a/pkg/kv/kvserver/intentresolver/BUILD.bazel b/pkg/kv/kvserver/intentresolver/BUILD.bazel index f70fb403dca2..26b5876012e1 100644 --- a/pkg/kv/kvserver/intentresolver/BUILD.bazel +++ b/pkg/kv/kvserver/intentresolver/BUILD.bazel @@ -34,24 +34,38 @@ go_library( go_test( name = "intentresolver_test", size = "small", - srcs = ["intent_resolver_test.go"], + srcs = [ + "intent_resolver_integration_test.go", + "intent_resolver_test.go", + "main_test.go", + ], args = ["-test.timeout=55s"], embed = [":intentresolver"], deps = [ + "//pkg/base", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/batcheval/result", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index f7032e486158..2722d615b571 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -73,6 +73,12 @@ const ( // ResumeSpan and the batcher will send a new range request. intentResolverRangeRequestSize = 200 + // intentResolverRequestTargetBytes is the target number of bytes of the + // write batch resulting from an intent resolution request. When exceeded, + // the response will include a ResumeSpan and the batcher will send a new + // intent resolution request. + intentResolverRequestTargetBytes = 4 << 20 // 4MB. + // MaxTxnsPerIntentCleanupBatch is the number of transactions whose // corresponding intents will be resolved at a time. Intents are batched // by transaction to avoid timeouts while resolving intents and ensure that @@ -225,24 +231,27 @@ func New(c Config) *IntentResolver { intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize } + targetBytesPerBatchReq := int64(intentResolverRequestTargetBytes) ir.irBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_batcher", - MaxMsgsPerBatch: intentResolutionBatchSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_batcher", + MaxMsgsPerBatch: intentResolutionBatchSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_range_batcher", - MaxMsgsPerBatch: intentResolutionRangeBatchSize, - MaxKeysPerBatchReq: intentResolverRangeRequestSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_range_batcher", + MaxMsgsPerBatch: intentResolutionRangeBatchSize, + MaxKeysPerBatchReq: intentResolverRangeRequestSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) return ir } diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go new file mode 100644 index 000000000000..58b872e30aa5 --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go @@ -0,0 +1,194 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "context" + gosql "database/sql" + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func getRangeInfoForTable( + ctx context.Context, t *testing.T, db *gosql.DB, servers []*server.TestServer, tableName string, +) (startKey, endKey roachpb.Key, store *kvserver.Store) { + var rangeID roachpb.RangeID + err := db.QueryRow(fmt.Sprintf("select range_id from [show ranges from table %s] limit 1", tableName)).Scan(&rangeID) + require.NoError(t, err) + for _, server := range servers { + require.NoError(t, server.Stores().VisitStores(func(s *kvserver.Store) error { + if replica, err := s.GetReplica(rangeID); err == nil && replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) { + desc := replica.Desc() + startKey = desc.StartKey.AsRawKey() + endKey = desc.EndKey.AsRawKey() + store = s + } + return nil + })) + } + return startKey, endKey, store +} + +func forceScanOnAllReplicationQueues(tc *testcluster.TestCluster) (err error) { + for _, s := range tc.Servers { + err = s.Stores().VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + } + return err +} + +// TestAsyncIntentResolutionByteSizePagination tests that async intent +// resolution through the IntentResolver has byte size pagination. This is done +// by creating a transaction that first writes to a range (transaction record) +// and then in another range: writes such that the total bytes of the write +// values exceeds the max raft command size and updating the transaction +// timestamp to ensure the key values are written to the raft command during +// intent resolution. The latter intents will be resolved asynchronously in the +// IntentResolver, but the write batch size from intent resolution will exceed +// the max raft command size resulting in an error and not all intents will be +// resolved, unless byte size pagination is implemented. +func TestAsyncIntentResolutionByteSizePagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + const numNodes = 2 + serverArgs := make(map[int]base.TestServerArgs) + for i := 1; i <= numNodes; i++ { + serverArgs[i-1] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "rack", Value: strconv.Itoa(i), + }, + }, + }, + } + } + + // Start test cluster. + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, numNodes, clusterArgs) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + + // Create 2 tables t1 and t2 and wait for t1 to be replicated on store 1 and + // t2 to be replicated on store 2. + numTables := 2 + for i := 1; i <= numTables; i++ { + _, err := db.Exec(fmt.Sprintf("CREATE TABLE t%d (i INT PRIMARY KEY, j STRING)", i)) + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t%d CONFIGURE ZONE USING num_replicas = 1, constraints = '{\"+rack=%d\": 1}'", i, i)) + require.NoError(t, err) + } + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + for i := 1; i <= numTables; i++ { + r := db.QueryRow(fmt.Sprintf("select replicas from [show ranges from table t%d]", i)) + var repl string + if err := r.Scan(&repl); err != nil { + return err + } + if repl != fmt.Sprintf("{%d}", i) { + return errors.Newf("Expected replicas {%d} for table t%d, got %s", i, i, repl) + } + } + return nil + }) + + // Set the max raft command size to 5MB. + for _, server := range tc.Servers { + st := server.ClusterSettings() + st.Manual.Store(true) + kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 5<<20) + } + + const numIntents = 7 + { + tx, err := db.Begin() + require.NoError(t, err) + + // Create transaction record on a different range from t1 to ensure intents + // for t1 are resolved asynchronously. + _, err = tx.Exec("INSERT INTO t2 (i, j) VALUES (0, '0')") + require.NoError(t, err) + + // Insert kv pairs whose values exceed max raft command size = 5MB in + // total. + for i := 0; i < numIntents-1; i++ { + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t1 (i, j) VALUES (%d, '%01000000d')", i, i)) + require.NoError(t, err) + } + + // Create a later transaction that writes to key numIntents-1. + { + tx2, err := db.Begin() + require.NoError(t, err) + _, err = tx2.Exec(fmt.Sprintf("INSERT INTO t1 (i, j) VALUES (%d, '0')", numIntents-1)) + require.NoError(t, err) + err = tx2.Commit() + require.NoError(t, err) + } + + // Have the first transaction write to key numIntents-1, which will force + // the transaction to update its timestamp. + _, err = tx.Exec(fmt.Sprintf("UPDATE t1 SET j = '1' WHERE i = %d", numIntents-1)) + require.NoError(t, err) + + // Commit, which will asynchronously resolve the intents for t1, and the + // write batch size from intent resolution will exceed the max raft command + // size resulting in an error and not all intents will be resolved, unless + // byte size pagination is implemented. Below, we check that all intents + // have been resolved. + err = tx.Commit() + require.NoError(t, err) + } + + // Get the store, start key, and end key of the range containing table t1. + startKey, endKey, store := getRangeInfoForTable(ctx, t, db, tc.Servers, "t1") + + // Check that all intents have been resolved to ensure async intent + //resolution did not exceed the max raft command size, which can only happen + // if byte size pagination was implemented. + testutils.SucceedsSoon(t, func() error { + result, err := storage.MVCCScanToBytes(ctx, store.Engine(), startKey, endKey, + hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true}) + if err != nil { + return err + } + if intentCount := len(result.Intents); intentCount != 0 { + return errors.Errorf("%d intents still unresolved", intentCount) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/intentresolver/main_test.go b/pkg/kv/kvserver/intentresolver/main_test.go new file mode 100644 index 000000000000..1d6de3e166fa --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/main_test.go @@ -0,0 +1,32 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func init() { + securityassets.SetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + + os.Exit(m.Run()) +}