diff --git a/pkg/internal/client/requestbatcher/BUILD.bazel b/pkg/internal/client/requestbatcher/BUILD.bazel index 54c09ec45d81..2562f9f97e62 100644 --- a/pkg/internal/client/requestbatcher/BUILD.bazel +++ b/pkg/internal/client/requestbatcher/BUILD.bazel @@ -31,6 +31,7 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index 543694d418af..2f6f7b9399ce 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -120,6 +120,11 @@ type Config struct { // request. If MaxKeysPerBatchReq <= 0 then no limit is enforced. MaxKeysPerBatchReq int + // TargetBytesPerBatchReq is the desired TargetBytes assigned to the Header + // of each batch request. If TargetBytesPerBatchReq <= 0, then no TargetBytes + // is enforced. + TargetBytesPerBatchReq int64 + // MaxWait is the maximum amount of time a message should wait in a batch // before being sent. If MaxWait is <= 0 then no wait timeout is enforced. // It is inadvisable to disable both MaxIdle and MaxWait. @@ -289,14 +294,14 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } } // Send requests in a loop to support pagination, which may be necessary - // if MaxKeysPerBatchReq is set. If so, partial responses with resume - // spans may be returned for requests, indicating that the limit was hit - // before they could complete and that they should be resumed over the - // specified key span. Requests in the batch are neither guaranteed to - // be ordered nor guaranteed to be non-overlapping, so we can make no - // assumptions about the requests that will result in full responses - // (with no resume spans) vs. partial responses vs. empty responses (see - // the comment on kvpb.Header.MaxSpanRequestKeys). + // if MaxKeysPerBatchReq or TargetBytesPerBatchReq is set. If so, partial + // responses with resume spans may be returned for requests, indicating + // that the limit was hit before they could complete and that they should + // be resumed over the specified key span. Requests in the batch are + // neither guaranteed to be ordered nor guaranteed to be non-overlapping, + // so we can make no assumptions about the requests that will result in + // full responses (with no resume spans) vs. partial responses vs. empty + // responses (see the comment on kvpb.Header.MaxSpanRequestKeys). // // To accommodate this, we keep track of all partial responses from // previous iterations. After receiving a batch of responses during an @@ -538,6 +543,9 @@ func (b *batch) batchRequest(cfg *Config) *kvpb.BatchRequest { if cfg.MaxKeysPerBatchReq > 0 { req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq) } + if cfg.TargetBytesPerBatchReq > 0 { + req.TargetBytes = cfg.TargetBytesPerBatchReq + } return req } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 40960168f83b..40962ee1cef6 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -547,6 +548,30 @@ func TestMaxKeysPerBatchReq(t *testing.T) { } } +// TestTargetBytesPerBatchReq checks that the correct TargetBytes limit is set +// according to the TargetBytesPerBatchReqFn passed in via the config. +func TestTargetBytesPerBatchReq(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + sc := make(chanSender) + b := New(Config{ + // MaxMsgsPerBatch of 1 is chosen so that the first call to Send will + // immediately lead to a batch being sent. + MaxMsgsPerBatch: 1, + Sender: sc, + Stopper: stopper, + TargetBytesPerBatchReq: 4 << 20, + }) + respChan := make(chan Response, 1) + + err := b.SendWithChan(context.Background(), respChan, 1, &roachpb.GetRequest{}) + require.NoError(t, err) + s := <-sc + assert.Equal(t, int64(4<<20), s.ba.TargetBytes) + s.respChan <- batchResp{} +} + func TestPanicWithNilSender(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e9d75698bed2..a0300f31ac5a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -700,7 +700,7 @@ func (ds *DistSender) initAndVerifyBatch(ctx context.Context, ba *kvpb.BatchRequ foundReverse = true case *kvpb.QueryIntentRequest, *kvpb.EndTxnRequest, - *kvpb.GetRequest, *kvpb.DeleteRequest: + *kvpb.GetRequest, *kvpb.ResolveIntentRequest, *kvpb.DeleteRequest: // Accepted point requests that can be in batches with limit. default: diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index e5c2b2167912..38bfa2fd89b6 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -393,6 +393,19 @@ func (dr *RevertRangeResponse) combine(c combinable) error { var _ combinable = &RevertRangeResponse{} +// combine implements the combinable interface. +func (rr *ResolveIntentResponse) combine(c combinable) error { + otherRR := c.(*ResolveIntentResponse) + if rr != nil { + if err := rr.ResponseHeader.combine(otherRR.Header()); err != nil { + return err + } + } + return nil +} + +var _ combinable = &ResolveIntentResponse{} + // combine implements the combinable interface. func (rr *ResolveIntentRangeResponse) combine(c combinable) error { otherRR := c.(*ResolveIntentRangeResponse) diff --git a/pkg/kv/kvserver/intentresolver/BUILD.bazel b/pkg/kv/kvserver/intentresolver/BUILD.bazel index 9acdc246e938..188628628e40 100644 --- a/pkg/kv/kvserver/intentresolver/BUILD.bazel +++ b/pkg/kv/kvserver/intentresolver/BUILD.bazel @@ -35,25 +35,39 @@ go_library( go_test( name = "intentresolver_test", size = "small", - srcs = ["intent_resolver_test.go"], + srcs = [ + "intent_resolver_integration_test.go", + "intent_resolver_test.go", + "main_test.go", + ], args = ["-test.timeout=55s"], embed = [":intentresolver"], deps = [ + "//pkg/base", "//pkg/kv", "//pkg/kv/kvpb", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/batcheval/result", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index 940965efb610..f9592287c4a6 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -74,6 +74,12 @@ const ( // ResumeSpan and the batcher will send a new range request. intentResolverRangeRequestSize = 200 + // intentResolverRequestTargetBytes is the target number of bytes of the + // write batch resulting from an intent resolution request. When exceeded, + // the response will include a ResumeSpan and the batcher will send a new + // intent resolution request. + intentResolverRequestTargetBytes = 4 << 20 // 4MB. + // MaxTxnsPerIntentCleanupBatch is the number of transactions whose // corresponding intents will be resolved at a time. Intents are batched // by transaction to avoid timeouts while resolving intents and ensure that @@ -226,24 +232,27 @@ func New(c Config) *IntentResolver { intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize } + targetBytesPerBatchReq := int64(intentResolverRequestTargetBytes) ir.irBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_batcher", - MaxMsgsPerBatch: intentResolutionBatchSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_batcher", + MaxMsgsPerBatch: intentResolutionBatchSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_range_batcher", - MaxMsgsPerBatch: intentResolutionRangeBatchSize, - MaxKeysPerBatchReq: intentResolverRangeRequestSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_range_batcher", + MaxMsgsPerBatch: intentResolutionRangeBatchSize, + MaxKeysPerBatchReq: intentResolverRangeRequestSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) return ir } diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go new file mode 100644 index 000000000000..f6d3bd502845 --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go @@ -0,0 +1,150 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "context" + gosql "database/sql" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func getRangeInfoForTable( + ctx context.Context, t *testing.T, db *gosql.DB, servers []*server.TestServer, tableName string, +) (startKey, endKey roachpb.Key, store *kvserver.Store) { + var rangeID roachpb.RangeID + err := db.QueryRow(fmt.Sprintf("select range_id from [show ranges from table %s] limit 1", tableName)).Scan(&rangeID) + require.NoError(t, err) + for _, server := range servers { + require.NoError(t, server.Stores().VisitStores(func(s *kvserver.Store) error { + if replica, err := s.GetReplica(rangeID); err == nil && replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) { + desc := replica.Desc() + startKey = desc.StartKey.AsRawKey() + endKey = desc.EndKey.AsRawKey() + store = s + } + return nil + })) + } + return startKey, endKey, store +} + +// TestAsyncIntentResolutionByteSizePagination tests that async intent +// resolution through the IntentResolver has byte size pagination. This is done +// by creating a transaction that first writes to a range (transaction record) +// and then in another range: writes such that the total bytes of the write +// values exceeds the max raft command size and updating the transaction +// timestamp to ensure the key values are written to the raft command during +// intent resolution. The latter intents will be resolved asynchronously in the +// IntentResolver, but the write batch size from intent resolution will exceed +// the max raft command size resulting in an error and not all intents will be +// resolved, unless byte size pagination is implemented. +func TestAsyncIntentResolutionByteSizePagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Start test cluster. + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + const numIntents = 7 + + // Create table t and split into two ranges, the first range consists of + // primary key < numIntents and second range consists of primary key >= + // numIntents. + { + _, err := db.Exec("CREATE TABLE t (i INT PRIMARY KEY, j STRING)") + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES (%d)", numIntents)) + require.NoError(t, err) + } + + // Set the max raft command size to 5MB. + st := tc.Servers[0].ClusterSettings() + st.Manual.Store(true) + kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 5<<20) + + { + // Create the first transaction. + tx, err := db.Begin() + require.NoError(t, err) + + // Create transaction record on the second range of t to ensure intents + // for t on the first range are resolved asynchronously. + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t (i, j) VALUES (%d, '0')", numIntents)) + require.NoError(t, err) + + // Insert kv pairs whose values exceed max raft command size = 5MB in + // total. This will be inserted into t on the first range. + for i := 0; i < numIntents-1; i++ { + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t(i, j) VALUES (%d, '%01000000d')", i, i)) + require.NoError(t, err) + } + + // Create a later transaction that writes to key numIntents-1. This + // will be inserted into t on the first range. + { + tx2, err := db.Begin() + require.NoError(t, err) + _, err = tx2.Exec(fmt.Sprintf("INSERT INTO t (i, j) VALUES (%d, '0')", numIntents-1)) + require.NoError(t, err) + err = tx2.Commit() + require.NoError(t, err) + } + + // Have the first transaction write to key numIntents-1, which will + // force the transaction to update its timestamp. + _, err = tx.Exec(fmt.Sprintf("UPDATE t SET j = '1' WHERE i = %d", numIntents-1)) + require.NoError(t, err) + + // Commit, which will asynchronously resolve the intents for t, and the + // write batch size from intent resolution will exceed the max raft + // command size resulting in an error and not all intents will be + // resolved, unless byte size pagination is implemented. Below, we + // check that all intents have been resolved. + err = tx.Commit() + require.NoError(t, err) + } + + // Get the store, start key, and end key of the range containing table t. + startKey, endKey, store := getRangeInfoForTable(ctx, t, db, tc.Servers, "t") + + // Check that all intents have been resolved to ensure async intent + // resolution did not exceed the max raft command size, which can only + // happen if byte size pagination was implemented. + testutils.SucceedsSoon(t, func() error { + result, err := storage.MVCCScanToBytes(ctx, store.TODOEngine(), startKey, endKey, + hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true}) + if err != nil { + return err + } + if intentCount := len(result.Intents); intentCount != 0 { + return errors.Errorf("%d intents still unresolved", intentCount) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/intentresolver/main_test.go b/pkg/kv/kvserver/intentresolver/main_test.go new file mode 100644 index 000000000000..1d6de3e166fa --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/main_test.go @@ -0,0 +1,32 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func init() { + securityassets.SetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + + os.Exit(m.Run()) +}