From 7e926ab1d3180fbaca0b9795b9943b7314202d22 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Wed, 11 Jan 2023 22:27:18 -0500 Subject: [PATCH] intentresolver: Add byte pagination to RequestBatcher and IntentResolver Informs: #77228 Intent resolution batches are sequenced on raft and each batch can consist of 100-200 intents. If an intent key or even value in some cases are large, it is possible that resolving all intents in the batch would result in a raft command size exceeding the max raft command size kv.raft.command.max_size. In PR #94814, we added support for TargetBytes for resolve intent and resolve intent range raft commands. In this PR, we set the TargetBytes field in the resolve intent and resolve intent range requests in the RequestBatcher and IntentResolver to 4MB. This allows us to stop resolving intents in the batch before we get close to the max raft command size to lower the chance we exceed this limit. Release note: None --- .../client/requestbatcher/BUILD.bazel | 1 + pkg/internal/client/requestbatcher/batcher.go | 24 ++- .../client/requestbatcher/batcher_test.go | 25 +++ pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- pkg/kv/kvpb/api.go | 13 ++ pkg/kv/kvserver/intentresolver/BUILD.bazel | 16 +- .../intentresolver/intent_resolver.go | 39 +++-- .../intent_resolver_integration_test.go | 150 ++++++++++++++++++ pkg/kv/kvserver/intentresolver/main_test.go | 32 ++++ 9 files changed, 277 insertions(+), 25 deletions(-) create mode 100644 pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go create mode 100644 pkg/kv/kvserver/intentresolver/main_test.go diff --git a/pkg/internal/client/requestbatcher/BUILD.bazel b/pkg/internal/client/requestbatcher/BUILD.bazel index 54c09ec45d81..2562f9f97e62 100644 --- a/pkg/internal/client/requestbatcher/BUILD.bazel +++ b/pkg/internal/client/requestbatcher/BUILD.bazel @@ -31,6 +31,7 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index 543694d418af..2f6f7b9399ce 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -120,6 +120,11 @@ type Config struct { // request. If MaxKeysPerBatchReq <= 0 then no limit is enforced. MaxKeysPerBatchReq int + // TargetBytesPerBatchReq is the desired TargetBytes assigned to the Header + // of each batch request. If TargetBytesPerBatchReq <= 0, then no TargetBytes + // is enforced. + TargetBytesPerBatchReq int64 + // MaxWait is the maximum amount of time a message should wait in a batch // before being sent. If MaxWait is <= 0 then no wait timeout is enforced. // It is inadvisable to disable both MaxIdle and MaxWait. @@ -289,14 +294,14 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } } // Send requests in a loop to support pagination, which may be necessary - // if MaxKeysPerBatchReq is set. If so, partial responses with resume - // spans may be returned for requests, indicating that the limit was hit - // before they could complete and that they should be resumed over the - // specified key span. Requests in the batch are neither guaranteed to - // be ordered nor guaranteed to be non-overlapping, so we can make no - // assumptions about the requests that will result in full responses - // (with no resume spans) vs. partial responses vs. empty responses (see - // the comment on kvpb.Header.MaxSpanRequestKeys). + // if MaxKeysPerBatchReq or TargetBytesPerBatchReq is set. If so, partial + // responses with resume spans may be returned for requests, indicating + // that the limit was hit before they could complete and that they should + // be resumed over the specified key span. Requests in the batch are + // neither guaranteed to be ordered nor guaranteed to be non-overlapping, + // so we can make no assumptions about the requests that will result in + // full responses (with no resume spans) vs. partial responses vs. empty + // responses (see the comment on kvpb.Header.MaxSpanRequestKeys). // // To accommodate this, we keep track of all partial responses from // previous iterations. After receiving a batch of responses during an @@ -538,6 +543,9 @@ func (b *batch) batchRequest(cfg *Config) *kvpb.BatchRequest { if cfg.MaxKeysPerBatchReq > 0 { req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq) } + if cfg.TargetBytesPerBatchReq > 0 { + req.TargetBytes = cfg.TargetBytesPerBatchReq + } return req } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 40960168f83b..40962ee1cef6 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -547,6 +548,30 @@ func TestMaxKeysPerBatchReq(t *testing.T) { } } +// TestTargetBytesPerBatchReq checks that the correct TargetBytes limit is set +// according to the TargetBytesPerBatchReqFn passed in via the config. +func TestTargetBytesPerBatchReq(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + sc := make(chanSender) + b := New(Config{ + // MaxMsgsPerBatch of 1 is chosen so that the first call to Send will + // immediately lead to a batch being sent. + MaxMsgsPerBatch: 1, + Sender: sc, + Stopper: stopper, + TargetBytesPerBatchReq: 4 << 20, + }) + respChan := make(chan Response, 1) + + err := b.SendWithChan(context.Background(), respChan, 1, &roachpb.GetRequest{}) + require.NoError(t, err) + s := <-sc + assert.Equal(t, int64(4<<20), s.ba.TargetBytes) + s.respChan <- batchResp{} +} + func TestPanicWithNilSender(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e9d75698bed2..a0300f31ac5a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -700,7 +700,7 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ foundReverse = true case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest, - *kvpb.GetRequest, *kvpb.DeleteRequest: + *kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest: // Accepted point requests that can be in batches with limit. default: diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index e5c2b2167912..38bfa2fd89b6 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -393,6 +393,19 @@ func (dr *RevertRangeResponse) combine(c combinable) error { var _ combinable = &RevertRangeResponse{} +// combine implements the combinable interface. +func (rr *ResolveIntentResponse) combine(c combinable) error { + otherRR := c.(*ResolveIntentResponse) + if rr != nil { + if err := rr.ResponseHeader.combine(otherRR.Header()); err != nil { + return err + } + } + return nil +} + +var _ combinable = &ResolveIntentResponse{} + // combine implements the combinable interface. func (rr *ResolveIntentRangeResponse) combine(c combinable) error { otherRR := c.(*ResolveIntentRangeResponse) diff --git a/pkg/kv/kvserver/intentresolver/BUILD.bazel b/pkg/kv/kvserver/intentresolver/BUILD.bazel index 9acdc246e938..188628628e40 100644 --- a/pkg/kv/kvserver/intentresolver/BUILD.bazel +++ b/pkg/kv/kvserver/intentresolver/BUILD.bazel @@ -35,25 +35,39 @@ go_library( go_test( name = "intentresolver_test", size = "small", - srcs = ["intent_resolver_test.go"], + srcs = [ + "intent_resolver_integration_test.go", + "intent_resolver_test.go", + "main_test.go", + ], args = ["-test.timeout=55s"], embed = [":intentresolver"], deps = [ + "//pkg/base", "//pkg/kv", "//pkg/kv/kvpb", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/batcheval/result", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index 940965efb610..f9592287c4a6 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -74,6 +74,12 @@ const ( // ResumeSpan and the batcher will send a new range request. intentResolverRangeRequestSize = 200 + // intentResolverRequestTargetBytes is the target number of bytes of the + // write batch resulting from an intent resolution request. When exceeded, + // the response will include a ResumeSpan and the batcher will send a new + // intent resolution request. + intentResolverRequestTargetBytes = 4 << 20 // 4MB. + // MaxTxnsPerIntentCleanupBatch is the number of transactions whose // corresponding intents will be resolved at a time. Intents are batched // by transaction to avoid timeouts while resolving intents and ensure that @@ -226,24 +232,27 @@ func New(c Config) *IntentResolver { intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize } + targetBytesPerBatchReq := int64(intentResolverRequestTargetBytes) ir.irBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_batcher", - MaxMsgsPerBatch: intentResolutionBatchSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_batcher", + MaxMsgsPerBatch: intentResolutionBatchSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_range_batcher", - MaxMsgsPerBatch: intentResolutionRangeBatchSize, - MaxKeysPerBatchReq: intentResolverRangeRequestSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_range_batcher", + MaxMsgsPerBatch: intentResolutionRangeBatchSize, + MaxKeysPerBatchReq: intentResolverRangeRequestSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) return ir } diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go new file mode 100644 index 000000000000..f6d3bd502845 --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go @@ -0,0 +1,150 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "context" + gosql "database/sql" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func getRangeInfoForTable( + ctx context.Context, t *testing.T, db *gosql.DB, servers []*server.TestServer, tableName string, +) (startKey, endKey roachpb.Key, store *kvserver.Store) { + var rangeID roachpb.RangeID + err := db.QueryRow(fmt.Sprintf("select range_id from [show ranges from table %s] limit 1", tableName)).Scan(&rangeID) + require.NoError(t, err) + for _, server := range servers { + require.NoError(t, server.Stores().VisitStores(func(s *kvserver.Store) error { + if replica, err := s.GetReplica(rangeID); err == nil && replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) { + desc := replica.Desc() + startKey = desc.StartKey.AsRawKey() + endKey = desc.EndKey.AsRawKey() + store = s + } + return nil + })) + } + return startKey, endKey, store +} + +// TestAsyncIntentResolutionByteSizePagination tests that async intent +// resolution through the IntentResolver has byte size pagination. This is done +// by creating a transaction that first writes to a range (transaction record) +// and then in another range: writes such that the total bytes of the write +// values exceeds the max raft command size and updating the transaction +// timestamp to ensure the key values are written to the raft command during +// intent resolution. The latter intents will be resolved asynchronously in the +// IntentResolver, but the write batch size from intent resolution will exceed +// the max raft command size resulting in an error and not all intents will be +// resolved, unless byte size pagination is implemented. +func TestAsyncIntentResolutionByteSizePagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Start test cluster. + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + const numIntents = 7 + + // Create table t and split into two ranges, the first range consists of + // primary key < numIntents and second range consists of primary key >= + // numIntents. + { + _, err := db.Exec("CREATE TABLE t (i INT PRIMARY KEY, j STRING)") + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES (%d)", numIntents)) + require.NoError(t, err) + } + + // Set the max raft command size to 5MB. + st := tc.Servers[0].ClusterSettings() + st.Manual.Store(true) + kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 5<<20) + + { + // Create the first transaction. + tx, err := db.Begin() + require.NoError(t, err) + + // Create transaction record on the second range of t to ensure intents + // for t on the first range are resolved asynchronously. + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t (i, j) VALUES (%d, '0')", numIntents)) + require.NoError(t, err) + + // Insert kv pairs whose values exceed max raft command size = 5MB in + // total. This will be inserted into t on the first range. + for i := 0; i < numIntents-1; i++ { + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t(i, j) VALUES (%d, '%01000000d')", i, i)) + require.NoError(t, err) + } + + // Create a later transaction that writes to key numIntents-1. This + // will be inserted into t on the first range. + { + tx2, err := db.Begin() + require.NoError(t, err) + _, err = tx2.Exec(fmt.Sprintf("INSERT INTO t (i, j) VALUES (%d, '0')", numIntents-1)) + require.NoError(t, err) + err = tx2.Commit() + require.NoError(t, err) + } + + // Have the first transaction write to key numIntents-1, which will + // force the transaction to update its timestamp. + _, err = tx.Exec(fmt.Sprintf("UPDATE t SET j = '1' WHERE i = %d", numIntents-1)) + require.NoError(t, err) + + // Commit, which will asynchronously resolve the intents for t, and the + // write batch size from intent resolution will exceed the max raft + // command size resulting in an error and not all intents will be + // resolved, unless byte size pagination is implemented. Below, we + // check that all intents have been resolved. + err = tx.Commit() + require.NoError(t, err) + } + + // Get the store, start key, and end key of the range containing table t. + startKey, endKey, store := getRangeInfoForTable(ctx, t, db, tc.Servers, "t") + + // Check that all intents have been resolved to ensure async intent + // resolution did not exceed the max raft command size, which can only + // happen if byte size pagination was implemented. + testutils.SucceedsSoon(t, func() error { + result, err := storage.MVCCScanToBytes(ctx, store.TODOEngine(), startKey, endKey, + hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true}) + if err != nil { + return err + } + if intentCount := len(result.Intents); intentCount != 0 { + return errors.Errorf("%d intents still unresolved", intentCount) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/intentresolver/main_test.go b/pkg/kv/kvserver/intentresolver/main_test.go new file mode 100644 index 000000000000..1d6de3e166fa --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/main_test.go @@ -0,0 +1,32 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func init() { + securityassets.SetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + + os.Exit(m.Run()) +}