diff --git a/pkg/internal/client/requestbatcher/BUILD.bazel b/pkg/internal/client/requestbatcher/BUILD.bazel index dac8835030d9..4c36e3bfd611 100644 --- a/pkg/internal/client/requestbatcher/BUILD.bazel +++ b/pkg/internal/client/requestbatcher/BUILD.bazel @@ -29,6 +29,7 @@ go_test( "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index 90012e3616a2..ce9ccef9f20c 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -119,6 +119,11 @@ type Config struct { // request. If MaxKeysPerBatchReq <= 0 then no limit is enforced. MaxKeysPerBatchReq int + // TargetBytesPerBatchReq is the desired TargetBytes assigned to the Header + // of each batch request. If TargetBytesPerBatchReq <= 0, then no TargetBytes + // is enforced. + TargetBytesPerBatchReq int64 + // MaxWait is the maximum amount of time a message should wait in a batch // before being sent. If MaxWait is <= 0 then no wait timeout is enforced. // It is inadvisable to disable both MaxIdle and MaxWait. @@ -288,14 +293,14 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } } // Send requests in a loop to support pagination, which may be necessary - // if MaxKeysPerBatchReq is set. If so, partial responses with resume - // spans may be returned for requests, indicating that the limit was hit - // before they could complete and that they should be resumed over the - // specified key span. Requests in the batch are neither guaranteed to - // be ordered nor guaranteed to be non-overlapping, so we can make no - // assumptions about the requests that will result in full responses - // (with no resume spans) vs. partial responses vs. empty responses (see - // the comment on roachpb.Header.MaxSpanRequestKeys). + // if MaxKeysPerBatchReq or TargetBytesPerBatchReq is set. If so, partial + // responses with resume spans may be returned for requests, indicating + // that the limit was hit before they could complete and that they should + // be resumed over the specified key span. Requests in the batch are + // neither guaranteed to be ordered nor guaranteed to be non-overlapping, + // so we can make no assumptions about the requests that will result in + // full responses (with no resume spans) vs. partial responses vs. empty + // responses (see the comment on roachpb.Header.MaxSpanRequestKeys). // // To accommodate this, we keep track of all partial responses from // previous iterations. After receiving a batch of responses during an @@ -312,6 +317,10 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { var res Response if br != nil { resp := br.Responses[i].GetInner() + // For response types that do not implement the combinable interface, + // the resume span will be lost after roachpb.CombineResponses and + // resp = prevResp below, so extract the resume span now. + resume := resp.Header().ResumeSpan if prevResps != nil { prevResp := prevResps[i] if cErr := roachpb.CombineResponses(prevResp, resp); cErr != nil { @@ -319,7 +328,7 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } resp = prevResp } - if resume := resp.Header().ResumeSpan; resume != nil { + if resume != nil { // Add a trimmed request to the next batch. h := r.req.Header() h.SetSpan(*resume) @@ -537,6 +546,9 @@ func (b *batch) batchRequest(cfg *Config) *roachpb.BatchRequest { if cfg.MaxKeysPerBatchReq > 0 { req.MaxSpanRequestKeys = int64(cfg.MaxKeysPerBatchReq) } + if cfg.TargetBytesPerBatchReq > 0 { + req.TargetBytes = cfg.TargetBytesPerBatchReq + } return req } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 2ab6be9b40b2..945566faab60 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -546,6 +547,30 @@ func TestMaxKeysPerBatchReq(t *testing.T) { } } +// TestTargetBytesPerBatchReq checks that the correct TargetBytes limit is set +// according to the TargetBytesPerBatchReqFn passed in via the config. +func TestTargetBytesPerBatchReq(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + sc := make(chanSender) + b := New(Config{ + // MaxMsgsPerBatch of 1 is chosen so that the first call to Send will + // immediately lead to a batch being sent. + MaxMsgsPerBatch: 1, + Sender: sc, + Stopper: stopper, + TargetBytesPerBatchReq: 4 << 20, + }) + respChan := make(chan Response, 1) + + err := b.SendWithChan(context.Background(), respChan, 1, &roachpb.GetRequest{}) + require.NoError(t, err) + s := <-sc + assert.Equal(t, int64(4<<20), s.ba.TargetBytes) + s.respChan <- batchResp{} +} + func TestPanicWithNilSender(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 2eb47402abfa..8b3ab274c708 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -701,7 +701,7 @@ func (ds *DistSender) initAndVerifyBatch( foundReverse = true case *roachpb.QueryIntentRequest, *roachpb.EndTxnRequest, - *roachpb.GetRequest, *roachpb.DeleteRequest: + *roachpb.GetRequest, *roachpb.ResolveIntentRequest, *roachpb.DeleteRequest: // Accepted point requests that can be in batches with limit. default: diff --git a/pkg/kv/kvserver/intentresolver/BUILD.bazel b/pkg/kv/kvserver/intentresolver/BUILD.bazel index f70fb403dca2..26b5876012e1 100644 --- a/pkg/kv/kvserver/intentresolver/BUILD.bazel +++ b/pkg/kv/kvserver/intentresolver/BUILD.bazel @@ -34,24 +34,38 @@ go_library( go_test( name = "intentresolver_test", size = "small", - srcs = ["intent_resolver_test.go"], + srcs = [ + "intent_resolver_integration_test.go", + "intent_resolver_test.go", + "main_test.go", + ], args = ["-test.timeout=55s"], embed = [":intentresolver"], deps = [ + "//pkg/base", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/batcheval/result", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index f7032e486158..2722d615b571 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -73,6 +73,12 @@ const ( // ResumeSpan and the batcher will send a new range request. intentResolverRangeRequestSize = 200 + // intentResolverRequestTargetBytes is the target number of bytes of the + // write batch resulting from an intent resolution request. When exceeded, + // the response will include a ResumeSpan and the batcher will send a new + // intent resolution request. + intentResolverRequestTargetBytes = 4 << 20 // 4MB. + // MaxTxnsPerIntentCleanupBatch is the number of transactions whose // corresponding intents will be resolved at a time. Intents are batched // by transaction to avoid timeouts while resolving intents and ensure that @@ -225,24 +231,27 @@ func New(c Config) *IntentResolver { intentResolutionBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize } + targetBytesPerBatchReq := int64(intentResolverRequestTargetBytes) ir.irBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_batcher", - MaxMsgsPerBatch: intentResolutionBatchSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_batcher", + MaxMsgsPerBatch: intentResolutionBatchSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_range_batcher", - MaxMsgsPerBatch: intentResolutionRangeBatchSize, - MaxKeysPerBatchReq: intentResolverRangeRequestSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_range_batcher", + MaxMsgsPerBatch: intentResolutionRangeBatchSize, + MaxKeysPerBatchReq: intentResolverRangeRequestSize, + TargetBytesPerBatchReq: targetBytesPerBatchReq, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) return ir } diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go new file mode 100644 index 000000000000..58b872e30aa5 --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go @@ -0,0 +1,194 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "context" + gosql "database/sql" + "fmt" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func getRangeInfoForTable( + ctx context.Context, t *testing.T, db *gosql.DB, servers []*server.TestServer, tableName string, +) (startKey, endKey roachpb.Key, store *kvserver.Store) { + var rangeID roachpb.RangeID + err := db.QueryRow(fmt.Sprintf("select range_id from [show ranges from table %s] limit 1", tableName)).Scan(&rangeID) + require.NoError(t, err) + for _, server := range servers { + require.NoError(t, server.Stores().VisitStores(func(s *kvserver.Store) error { + if replica, err := s.GetReplica(rangeID); err == nil && replica.OwnsValidLease(ctx, replica.Clock().NowAsClockTimestamp()) { + desc := replica.Desc() + startKey = desc.StartKey.AsRawKey() + endKey = desc.EndKey.AsRawKey() + store = s + } + return nil + })) + } + return startKey, endKey, store +} + +func forceScanOnAllReplicationQueues(tc *testcluster.TestCluster) (err error) { + for _, s := range tc.Servers { + err = s.Stores().VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + } + return err +} + +// TestAsyncIntentResolutionByteSizePagination tests that async intent +// resolution through the IntentResolver has byte size pagination. This is done +// by creating a transaction that first writes to a range (transaction record) +// and then in another range: writes such that the total bytes of the write +// values exceeds the max raft command size and updating the transaction +// timestamp to ensure the key values are written to the raft command during +// intent resolution. The latter intents will be resolved asynchronously in the +// IntentResolver, but the write batch size from intent resolution will exceed +// the max raft command size resulting in an error and not all intents will be +// resolved, unless byte size pagination is implemented. +func TestAsyncIntentResolutionByteSizePagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + const numNodes = 2 + serverArgs := make(map[int]base.TestServerArgs) + for i := 1; i <= numNodes; i++ { + serverArgs[i-1] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "rack", Value: strconv.Itoa(i), + }, + }, + }, + } + } + + // Start test cluster. + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, numNodes, clusterArgs) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + + // Create 2 tables t1 and t2 and wait for t1 to be replicated on store 1 and + // t2 to be replicated on store 2. + numTables := 2 + for i := 1; i <= numTables; i++ { + _, err := db.Exec(fmt.Sprintf("CREATE TABLE t%d (i INT PRIMARY KEY, j STRING)", i)) + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t%d CONFIGURE ZONE USING num_replicas = 1, constraints = '{\"+rack=%d\": 1}'", i, i)) + require.NoError(t, err) + } + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + for i := 1; i <= numTables; i++ { + r := db.QueryRow(fmt.Sprintf("select replicas from [show ranges from table t%d]", i)) + var repl string + if err := r.Scan(&repl); err != nil { + return err + } + if repl != fmt.Sprintf("{%d}", i) { + return errors.Newf("Expected replicas {%d} for table t%d, got %s", i, i, repl) + } + } + return nil + }) + + // Set the max raft command size to 5MB. + for _, server := range tc.Servers { + st := server.ClusterSettings() + st.Manual.Store(true) + kvserverbase.MaxCommandSize.Override(ctx, &st.SV, 5<<20) + } + + const numIntents = 7 + { + tx, err := db.Begin() + require.NoError(t, err) + + // Create transaction record on a different range from t1 to ensure intents + // for t1 are resolved asynchronously. + _, err = tx.Exec("INSERT INTO t2 (i, j) VALUES (0, '0')") + require.NoError(t, err) + + // Insert kv pairs whose values exceed max raft command size = 5MB in + // total. + for i := 0; i < numIntents-1; i++ { + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t1 (i, j) VALUES (%d, '%01000000d')", i, i)) + require.NoError(t, err) + } + + // Create a later transaction that writes to key numIntents-1. + { + tx2, err := db.Begin() + require.NoError(t, err) + _, err = tx2.Exec(fmt.Sprintf("INSERT INTO t1 (i, j) VALUES (%d, '0')", numIntents-1)) + require.NoError(t, err) + err = tx2.Commit() + require.NoError(t, err) + } + + // Have the first transaction write to key numIntents-1, which will force + // the transaction to update its timestamp. + _, err = tx.Exec(fmt.Sprintf("UPDATE t1 SET j = '1' WHERE i = %d", numIntents-1)) + require.NoError(t, err) + + // Commit, which will asynchronously resolve the intents for t1, and the + // write batch size from intent resolution will exceed the max raft command + // size resulting in an error and not all intents will be resolved, unless + // byte size pagination is implemented. Below, we check that all intents + // have been resolved. + err = tx.Commit() + require.NoError(t, err) + } + + // Get the store, start key, and end key of the range containing table t1. + startKey, endKey, store := getRangeInfoForTable(ctx, t, db, tc.Servers, "t1") + + // Check that all intents have been resolved to ensure async intent + //resolution did not exceed the max raft command size, which can only happen + // if byte size pagination was implemented. + testutils.SucceedsSoon(t, func() error { + result, err := storage.MVCCScanToBytes(ctx, store.Engine(), startKey, endKey, + hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true}) + if err != nil { + return err + } + if intentCount := len(result.Intents); intentCount != 0 { + return errors.Errorf("%d intents still unresolved", intentCount) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/intentresolver/main_test.go b/pkg/kv/kvserver/intentresolver/main_test.go new file mode 100644 index 000000000000..1d6de3e166fa --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/main_test.go @@ -0,0 +1,32 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func init() { + securityassets.SetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + + os.Exit(m.Run()) +}