From e98ffcf9adfffaa9356367f205d8304cdd5ff054 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Sun, 30 Oct 2022 13:00:30 -0400 Subject: [PATCH] intentresolver: Add max timeout to RequestBatcher and IntentResolver There is a limit of 1000 in-flight intent resolution request batches that can be processed at a time before backpressure. We saw a case where an unavailable range resulted in many intent resolution request batches to be stuck, clogging up the worker pool and starving the other ranges trying to resolve intents. This resulted in more queries timing out. To address this, this patch adds a max timeout to RequestBatcher and IntentResolver to ensure that no worker trying to resolve a batch of intents gets stuck indefinitely e.g. due to an unavailable range. Release note (ops change): Added max timeout to intent resolution, preventing intent resolution from becoming stuck indefinitely and blocking other ranges attempting to resolve intents. --- pkg/internal/client/requestbatcher/batcher.go | 19 +- .../client/requestbatcher/batcher_test.go | 101 +++++++---- pkg/kv/kvserver/intentresolver/BUILD.bazel | 15 +- .../intentresolver/intent_resolver.go | 62 ++++--- .../intent_resolver_integration_test.go | 169 ++++++++++++++++++ .../intentresolver/intent_resolver_test.go | 116 ++++++++++-- pkg/kv/kvserver/intentresolver/main_test.go | 32 ++++ pkg/kv/kvserver/kvserverbase/knobs.go | 12 ++ 8 files changed, 449 insertions(+), 77 deletions(-) create mode 100644 pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go create mode 100644 pkg/kv/kvserver/intentresolver/main_test.go diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index 90012e3616a2..9413a757ae7d 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -130,6 +130,13 @@ type Config struct { // enforced. It is inadvisable to disable both MaxIdle and MaxWait. MaxIdle time.Duration + // MaxTimeout limits the amount of time that sending a batch can run for + // before timing out. This is used to prevent batches from stalling + // indefinitely, for instance due to an unavailable range. If MaxTimeout is + // <= 0, then the send batch timeout is derived from the requests' deadlines + // if they exist. + MaxTimeout time.Duration + // InFlightBackpressureLimit is the number of batches in flight above which // sending clients should experience backpressure. If the batcher has more // requests than this in flight it will not accept new requests until the @@ -280,11 +287,19 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) { } return nil } + var deadline time.Time + if b.cfg.MaxTimeout > 0 { + deadline = timeutil.Now().Add(b.cfg.MaxTimeout) + } if !ba.sendDeadline.IsZero() { + if deadline.IsZero() || ba.sendDeadline.Before(deadline) { + deadline = ba.sendDeadline + } + } + if !deadline.IsZero() { actualSend := send send = func(context.Context) error { - return contextutil.RunWithTimeout( - ctx, b.sendBatchOpName, timeutil.Until(ba.sendDeadline), actualSend) + return contextutil.RunWithTimeout(ctx, b.sendBatchOpName, timeutil.Until(deadline), actualSend) } } // Send requests in a loop to support pagination, which may be necessary diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 2ab6be9b40b2..abfc694f4ce8 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -312,46 +312,79 @@ func TestPanicWithNilStopper(t *testing.T) { } // TestBatchTimeout verifies the RequestBatcher uses the context with the -// deadline from the latest call to send. +// deadline from the latest call and max timeout to send. func TestBatchTimeout(t *testing.T) { defer leaktest.AfterTest(t)() const timeout = 5 * time.Millisecond stopper := stop.NewStopper() defer stopper.Stop(context.Background()) sc := make(chanSender) - t.Run("WithTimeout", func(t *testing.T) { - b := New(Config{ - // MaxMsgsPerBatch of 1 is chosen so that the first call to Send will - // immediately lead to a batch being sent. - MaxMsgsPerBatch: 1, - Sender: sc, - Stopper: stopper, - }) - // This test attempts to verify that a batch with a request with a timeout - // will be sent with that timeout. The test faces challenges of timing. - // There are several different phases at which the timeout may fire; - // the request may time out before it has been sent to the batcher, it - // may timeout while it is being sent or it may not time out until after - // it has been sent. Each of these cases are handled and verified to ensure - // that the request was indeed sent with a timeout. - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - respChan := make(chan Response, 1) - if err := b.SendWithChan(ctx, respChan, 1, &roachpb.GetRequest{}); err != nil { - testutils.IsError(err, context.DeadlineExceeded.Error()) - return - } - select { - case s := <-sc: - deadline, hasDeadline := s.ctx.Deadline() - assert.True(t, hasDeadline) - assert.True(t, timeutil.Until(deadline) < timeout) - s.respChan <- batchResp{} - case resp := <-respChan: - assert.Nil(t, resp.Resp) - testutils.IsError(resp.Err, context.DeadlineExceeded.Error()) - } - }) + testCases := []struct { + requestTimeout time.Duration + maxTimeout time.Duration + expectedTimeout time.Duration + }{ + { + requestTimeout: timeout, + maxTimeout: 0, + expectedTimeout: timeout, + }, + { + requestTimeout: 0, + maxTimeout: timeout, + expectedTimeout: timeout, + }, + { + requestTimeout: timeout, + maxTimeout: 3 * time.Millisecond, + expectedTimeout: 3 * time.Millisecond, + }, + { + requestTimeout: timeout, + maxTimeout: 7 * time.Millisecond, + expectedTimeout: timeout, + }, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("With%sRequestTimeout%sMaxTimeout", tc.requestTimeout, tc.maxTimeout), + func(t *testing.T) { + b := New(Config{ + // MaxMsgsPerBatch of 1 is chosen so that the first call to Send will + // immediately lead to a batch being sent. + MaxMsgsPerBatch: 1, + Sender: sc, + Stopper: stopper, + MaxTimeout: tc.maxTimeout, + }) + // This test attempts to verify that a batch with a request with a + // timeout will be sent with that timeout. The test faces challenges of + // timing. There are several different phases at which the timeout may + // fire; the request may time out before it has been sent to the + // batcher, it may timeout while it is being sent or it may not time + // out until after it has been sent. Each of these cases are handled + // and verified to ensure that the request was indeed sent with a + // timeout. + ctx, cancel := context.WithTimeout(context.Background(), tc.requestTimeout) + defer cancel() + respChan := make(chan Response, 1) + if err := b.SendWithChan(ctx, respChan, 1, &roachpb.GetRequest{}); err != nil { + testutils.IsError(err, context.DeadlineExceeded.Error()) + return + } + select { + case s := <-sc: + deadline, hasDeadline := s.ctx.Deadline() + assert.True(t, hasDeadline) + assert.True(t, timeutil.Until(deadline) < tc.expectedTimeout) + assert.True(t, timeutil.Until(deadline) > tc.expectedTimeout-time.Millisecond) + s.respChan <- batchResp{} + case resp := <-respChan: + assert.Nil(t, resp.Resp) + testutils.IsError(resp.Err, context.DeadlineExceeded.Error()) + } + }, + ) + } t.Run("NoTimeout", func(t *testing.T) { b := New(Config{ // MaxMsgsPerBatch of 2 is chosen so that the second call to Send will diff --git a/pkg/kv/kvserver/intentresolver/BUILD.bazel b/pkg/kv/kvserver/intentresolver/BUILD.bazel index f70fb403dca2..92cf792fdbfb 100644 --- a/pkg/kv/kvserver/intentresolver/BUILD.bazel +++ b/pkg/kv/kvserver/intentresolver/BUILD.bazel @@ -34,24 +34,37 @@ go_library( go_test( name = "intentresolver_test", size = "small", - srcs = ["intent_resolver_test.go"], + srcs = [ + "intent_resolver_integration_test.go", + "intent_resolver_test.go", + "main_test.go", + ], args = ["-test.timeout=55s"], embed = [":intentresolver"], deps = [ + "//pkg/base", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/batcheval/result", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", "//pkg/storage/enginepb", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index f7032e486158..7acf4eaa4d7c 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -73,6 +73,10 @@ const ( // ResumeSpan and the batcher will send a new range request. intentResolverRangeRequestSize = 200 + // intentResolverSendBatchTimeout is the maximum amount of time an intent + // resolution batch request can run for before timeout. + intentResolverSendBatchTimeout = 1 * time.Minute + // MaxTxnsPerIntentCleanupBatch is the number of transactions whose // corresponding intents will be resolved at a time. Intents are batched // by transaction to avoid timeouts while resolving intents and ensure that @@ -206,18 +210,28 @@ func New(c Config) *IntentResolver { c.Stopper.AddCloser(ir.sem.Closer("stopper")) ir.mu.inFlightPushes = map[uuid.UUID]int{} ir.mu.inFlightTxnCleanups = map[uuid.UUID]struct{}{} + intentResolutionSendBatchTimeout := intentResolverSendBatchTimeout + if c.TestingKnobs.MaxIntentResolutionSendBatchTimeout != 0 { + intentResolutionSendBatchTimeout = c.TestingKnobs.MaxIntentResolutionSendBatchTimeout + } + inFlightBackpressureLimit := requestbatcher.DefaultInFlightBackpressureLimit + if c.TestingKnobs.InFlightBackpressureLimit != 0 { + inFlightBackpressureLimit = c.TestingKnobs.InFlightBackpressureLimit + } gcBatchSize := gcBatchSize if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 { gcBatchSize = c.TestingKnobs.MaxGCBatchSize } ir.gcBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_gc_batcher", - MaxMsgsPerBatch: gcBatchSize, - MaxWait: c.MaxGCBatchWait, - MaxIdle: c.MaxGCBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_gc_batcher", + MaxMsgsPerBatch: gcBatchSize, + MaxWait: c.MaxGCBatchWait, + MaxIdle: c.MaxGCBatchIdle, + MaxTimeout: intentResolutionSendBatchTimeout, + InFlightBackpressureLimit: inFlightBackpressureLimit, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) intentResolutionBatchSize := intentResolverBatchSize intentResolutionRangeBatchSize := intentResolverRangeBatchSize @@ -226,23 +240,27 @@ func New(c Config) *IntentResolver { intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize } ir.irBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_batcher", - MaxMsgsPerBatch: intentResolutionBatchSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_batcher", + MaxMsgsPerBatch: intentResolutionBatchSize, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + MaxTimeout: intentResolutionSendBatchTimeout, + InFlightBackpressureLimit: inFlightBackpressureLimit, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{ - AmbientCtx: c.AmbientCtx, - Name: "intent_resolver_ir_range_batcher", - MaxMsgsPerBatch: intentResolutionRangeBatchSize, - MaxKeysPerBatchReq: intentResolverRangeRequestSize, - MaxWait: c.MaxIntentResolutionBatchWait, - MaxIdle: c.MaxIntentResolutionBatchIdle, - Stopper: c.Stopper, - Sender: c.DB.NonTransactionalSender(), + AmbientCtx: c.AmbientCtx, + Name: "intent_resolver_ir_range_batcher", + MaxMsgsPerBatch: intentResolutionRangeBatchSize, + MaxKeysPerBatchReq: intentResolverRangeRequestSize, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, + MaxTimeout: intentResolutionSendBatchTimeout, + InFlightBackpressureLimit: inFlightBackpressureLimit, + Stopper: c.Stopper, + Sender: c.DB.NonTransactionalSender(), }) return ir } diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go new file mode 100644 index 000000000000..e719b1cb061f --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_integration_test.go @@ -0,0 +1,169 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "context" + "fmt" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func forceScanOnAllReplicationQueues(tc *testcluster.TestCluster) (err error) { + for _, s := range tc.Servers { + err = s.Stores().VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + } + return err +} + +// TestIntentResolutionUnavailableRange tests that InFlightBackpressureLimit +// resolve intent batches for an unavailable range does not stall indefinitely +// and times out, allowing other resolve intent requests and queries for +// available ranges to be unblocked, run, and finish. This test does this by +// creating InFlightBackpressureLimit intent resolution batches containing +// range R, making range R unavailable and clogging up the intent resolver, +// having a transaction read a write intent to trigger intent resolution, and +// testing this read finishes (which can only happen if the previous intent +// resolution batches on the unavailable range R timed out). +func TestIntentResolutionUnavailableRange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set server args. We set InFlightBackpressureLimit to 1 so that we only + // need one intent resolution batch to clog up the intent resolver. + const numNodes = 3 + const inFlightBackpressureLimit = 1 + const intentResolutionSendBatchTimeout = 1 * time.Second + serverArgs := make(map[int]base.TestServerArgs) + for i := 1; i <= numNodes; i++ { + serverArgs[i-1] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "rack", Value: strconv.Itoa(i), + }, + }, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{ + InFlightBackpressureLimit: inFlightBackpressureLimit, + MaxIntentResolutionSendBatchTimeout: intentResolutionSendBatchTimeout, + }, + }, + }, + } + } + + // Start test cluster. + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: serverArgs, + } + tc := testcluster.StartTestCluster(t, numNodes, clusterArgs) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + + // Create 2 tables with replication factor 1 located on different servers and + // wait for replication and rebalancing to finish. + numTables := 2 + for i := 1; i <= numTables; i++ { + _, err := db.Exec(fmt.Sprintf("CREATE TABLE t%d (i INT PRIMARY KEY, j INT)", i)) + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t%d CONFIGURE ZONE USING num_replicas = 1, constraints = '{\"+rack=%d\": 1}'", i, i)) + require.NoError(t, err) + } + testutils.SucceedsSoon(t, func() error { + if err := forceScanOnAllReplicationQueues(tc); err != nil { + return err + } + for i := 1; i <= numTables; i++ { + r := db.QueryRow(fmt.Sprintf("select replicas from [show ranges from table t%d]", i)) + var repl string + if err := r.Scan(&repl); err != nil { + return err + } + if repl != fmt.Sprintf("{%d}", i) { + return errors.Newf("Expected replicas {%d} for table t%d, got %s", i, i, repl) + } + } + return nil + }) + + { + // Insert a row into two tables in a transaction. + tx, err := db.Begin() + require.NoError(t, err) + for i := 1; i <= numTables; i++ { + _, err = tx.Exec(fmt.Sprintf("INSERT INTO t%d (i, j) VALUES (0, 0)", i)) + require.NoError(t, err) + } + err = tx.Commit() + require.NoError(t, err) + } + + // Immediately make the range for t2 unavailable so intent resolution in the + // above transaction cannot finish successfully and must rely on the timout + // to finish. Sleep one second to ensure async intent cleanup as started. + tc.Servers[1].Stopper().Stop(ctx) + time.Sleep(400 * time.Millisecond) + + { + // One transaction updates a row in the first table, but does not commit + // immediately, ensuring a later read encountering this write intent + // (uncommitted value) triggers intent resolution. + tx, err := db.Begin() + require.NoError(t, err) + _, err = tx.Exec("UPDATE t1 SET j = 1 WHERE i = 0") + require.NoError(t, err) + + go func() { + time.Sleep(400 * time.Millisecond) + err = tx.Commit() + require.NoError(t, err) + }() + } + + { + // A later transaction reads from the above row and tries to resolve the + // intent. We test this read finishes, which can only happen if the intent + // resolution on the unavailable range timed out and finished. + db2 := tc.ServerConn(2) + var done int32 + go func() { + _, _ = db2.Exec("SELECT * FROM t1") + atomic.StoreInt32(&done, 1) + }() + testutils.SucceedsSoon(t, func() error { + if atomic.LoadInt32(&done) != 1 { + return errors.New("SELECT * FROM t1 did not complete because intent resolution has been clogged up") + } + return nil + }) + } +} diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_test.go index d32808ca0675..03d8116cc10f 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver_test.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestCleanupTxnIntentsOnGCAsync exercises the code which is used to @@ -365,7 +366,7 @@ func TestCleanupMultipleIntentsAsync(t *testing.T) { pushed []string resolved []string } - pushOrResolveFunc := func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + pushOrResolveFunc := func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { switch ba.Requests[0].GetInner().Method() { case roachpb.PushTxn: for _, ru := range ba.Requests { @@ -373,14 +374,14 @@ func TestCleanupMultipleIntentsAsync(t *testing.T) { reqs.pushed = append(reqs.pushed, string(ru.GetPushTxn().Key)) reqs.Unlock() } - return pushTxnSendFunc(t, len(ba.Requests))(ba) + return pushTxnSendFunc(t, len(ba.Requests))(ctx, ba) case roachpb.ResolveIntent: for _, ru := range ba.Requests { reqs.Lock() reqs.resolved = append(reqs.resolved, string(ru.GetResolveIntent().Key)) reqs.Unlock() } - return resolveIntentsSendFunc(t)(ba) + return resolveIntentsSendFunc(t)(ctx, ba) default: return nil, roachpb.NewErrorf("unexpected") } @@ -469,7 +470,7 @@ func TestCleanupTxnIntentsAsyncWithPartialRollback(t *testing.T) { txn.IgnoredSeqNums = []enginepb.IgnoredSeqNumRange{{Start: 1, End: 1}} var gotResolveIntent, gotResolveIntentRange int32 - check := func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + check := func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { for _, r := range ba.Requests { if ri, ok := r.GetInner().(*roachpb.ResolveIntentRequest); ok { atomic.StoreInt32(&gotResolveIntent, 1) @@ -627,7 +628,7 @@ func TestCleanupMultipleTxnIntentsAsync(t *testing.T) { resolved []string gced []string } - resolveOrGCFunc := func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + resolveOrGCFunc := func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if len(ba.Requests) != 1 { return nil, roachpb.NewErrorf("unexpected") } @@ -637,19 +638,19 @@ func TestCleanupMultipleTxnIntentsAsync(t *testing.T) { reqs.Lock() reqs.resolved = append(reqs.resolved, string(ru.GetResolveIntent().Key)) reqs.Unlock() - return resolveIntentsSendFunc(t)(ba) + return resolveIntentsSendFunc(t)(ctx, ba) case roachpb.ResolveIntentRange: reqs.Lock() req := ru.GetResolveIntentRange() reqs.resolved = append(reqs.resolved, fmt.Sprintf("%s-%s", string(req.Key), string(req.EndKey))) reqs.Unlock() - return resolveIntentsSendFunc(t)(ba) + return resolveIntentsSendFunc(t)(ctx, ba) case roachpb.GC: reqs.Lock() reqs.gced = append(reqs.gced, string(ru.GetGc().Key)) reqs.Unlock() - return gcSendFunc(t)(ba) + return gcSendFunc(t)(ctx, ba) default: return nil, roachpb.NewErrorf("unexpected") } @@ -755,6 +756,75 @@ func TestCleanupIntents(t *testing.T) { } } +// TestIntentResolutionTimeout tests that running intent resolution with an +// unavailable range eventually times out and finishes, and does not block +// intent resolution on another available range. +func TestIntentResolutionTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + + // c is to ensure that intent resolution on the available range occurs after + // intent resolution on the unavailable range. + c := make(chan struct{}) + unavailableRangeSendFunc := func(ctx context.Context, _ *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + c <- struct{}{} + <-ctx.Done() + return nil, &roachpb.Error{} + } + sf := func() *sendFuncs { + s := newSendFuncs(t) + s.pushFrontLocked( + singlePushTxnSendFunc(t), + unavailableRangeSendFunc, + singlePushTxnSendFunc(t), + resolveIntentsSendFunc(t), + ) + return s + }() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) + cfg := Config{ + Stopper: stopper, + Clock: clock, + TestingKnobs: kvserverbase.IntentResolverTestingKnobs{ + InFlightBackpressureLimit: 1, + MaxIntentResolutionSendBatchTimeout: 1 * time.Second, + }, + } + ir := newIntentResolverWithSendFuncsConcurrentSend(cfg, sf, stopper, true) + + // Intent resolution on unavailable range. + var cleanupIntentsErrFinished int32 + go func() { + num, err := ir.CleanupIntents(context.Background(), makeTxnIntents(t, clock, 1), clock.Now(), roachpb.PUSH_ABORT) + require.Error(t, err) + require.Equal(t, num, 0) + atomic.StoreInt32(&cleanupIntentsErrFinished, 1) + }() + + // Intent resolution on available range. + var cleanupIntentsSuccessFinished int32 + go func() { + // Ensure intent resolution occurs after that of the unavailable range. + <-c + num, err := ir.CleanupIntents(context.Background(), makeTxnIntents(t, clock, 1), clock.Now(), roachpb.PUSH_ABORT) + require.NoError(t, err) + require.Equal(t, num, 1) + atomic.StoreInt32(&cleanupIntentsSuccessFinished, 1) + }() + + testutils.SucceedsSoon(t, func() error { + if atomic.LoadInt32(&cleanupIntentsErrFinished) != 1 { + return errors.New("CleanupIntents of unavailable range did not finish") + } + if atomic.LoadInt32(&cleanupIntentsSuccessFinished) != 1 { + return errors.New("CleanupIntents of available range did not finish") + } + return nil + }) + assert.Equal(t, ir.Metrics.IntentResolutionFailed.Count(), int64(1)) +} + func newTransaction( name string, baseKey roachpb.Key, userPriority roachpb.UserPriority, clock *hlc.Clock, ) *roachpb.Transaction { @@ -783,17 +853,27 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In // the IntentResolver tries to send. They are used in conjunction with the below // function to create an IntentResolver with a slice of sendFuncs. // A library of useful sendFuncs are defined below. -type sendFunc func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) +type sendFunc func(ctx context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) func newIntentResolverWithSendFuncs( c Config, sf *sendFuncs, stopper *stop.Stopper, +) *IntentResolver { + return newIntentResolverWithSendFuncsConcurrentSend(c, sf, stopper, false) +} + +func newIntentResolverWithSendFuncsConcurrentSend( + c Config, sf *sendFuncs, stopper *stop.Stopper, allowConcurrentSend bool, ) *IntentResolver { txnSenderFactory := kv.NonTransactionalFactoryFunc( - func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + func(ctx context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { sf.mu.Lock() - defer sf.mu.Unlock() f := sf.popLocked() - return f(ba) + if allowConcurrentSend { + sf.mu.Unlock() + } else { + defer sf.mu.Unlock() + } + return f(ctx, ba) }) db := kv.NewDB(log.MakeTestingAmbientCtxWithNewTracer(), txnSenderFactory, c.Clock, stopper) c.DB = db @@ -805,7 +885,7 @@ func newIntentResolverWithSendFuncs( func pushTxnSendFuncs(sf *sendFuncs, N int) sendFunc { toPush := int64(N) var f sendFunc - f = func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + f = func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if remaining := atomic.LoadInt64(&toPush); len(ba.Requests) > int(remaining) { sf.t.Errorf("expected at most %d PushTxnRequests in batch, got %d", remaining, len(ba.Requests)) @@ -820,7 +900,7 @@ func pushTxnSendFuncs(sf *sendFuncs, N int) sendFunc { } func pushTxnSendFunc(t *testing.T, numPushes int) sendFunc { - return func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if len(ba.Requests) != numPushes { t.Errorf("expected %d PushTxnRequests in batch, got %d", numPushes, len(ba.Requests)) @@ -856,7 +936,7 @@ func resolveIntentsSendFuncsEx( toResolve := int64(numIntents) reqsSeen := int64(0) var f sendFunc - f = func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + f = func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if remaining := atomic.LoadInt64(&toResolve); len(ba.Requests) > int(remaining) { sf.t.Errorf("expected at most %d ResolveIntentRequests in batch, got %d", remaining, len(ba.Requests)) @@ -875,7 +955,7 @@ func resolveIntentsSendFuncsEx( } func resolveIntentsSendFuncEx(t *testing.T, checkTxnStatusOpt checkTxnStatusOpt) sendFunc { - return func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { return respForResolveIntentBatch(t, ba, checkTxnStatusOpt), nil } } @@ -892,12 +972,12 @@ func resolveIntentsSendFunc(t *testing.T) sendFunc { return resolveIntentsSendFuncEx(t, dontCheckTxnStatus) } -func failSendFunc(*roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { +func failSendFunc(context.Context, *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { return nil, roachpb.NewError(fmt.Errorf("boom")) } func gcSendFunc(t *testing.T) sendFunc { - return func(ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return func(_ context.Context, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { resp := &roachpb.BatchResponse{} for _, r := range ba.Requests { if _, ok := r.GetInner().(*roachpb.GCRequest); !ok { diff --git a/pkg/kv/kvserver/intentresolver/main_test.go b/pkg/kv/kvserver/intentresolver/main_test.go new file mode 100644 index 000000000000..1d6de3e166fa --- /dev/null +++ b/pkg/kv/kvserver/intentresolver/main_test.go @@ -0,0 +1,32 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package intentresolver_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func init() { + securityassets.SetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + + os.Exit(m.Run()) +} diff --git a/pkg/kv/kvserver/kvserverbase/knobs.go b/pkg/kv/kvserver/kvserverbase/knobs.go index 6a79fdb10f89..493d7d334085 100644 --- a/pkg/kv/kvserver/kvserverbase/knobs.go +++ b/pkg/kv/kvserver/kvserverbase/knobs.go @@ -14,6 +14,8 @@ package kvserverbase +import "time" + // BatchEvalTestingKnobs contains testing helpers that are used during batch evaluation. type BatchEvalTestingKnobs struct { // TestingEvalFilter is called before evaluating each command. @@ -70,4 +72,14 @@ type IntentResolverTestingKnobs struct { // MaxIntentResolutionBatchSize overrides the maximum number of intent // resolution requests which can be sent in a single batch. MaxIntentResolutionBatchSize int + + // InFlightBackpressureLimit overrides the number of batches in flight above + // which sending intent resolution batch requests should experience + // backpressure. + InFlightBackpressureLimit int + + // MaxIntentResolutionSendBatchTimeout overrides the maximum amount of time + // that sending an intent resolution batch request can run for before timing + // out. + MaxIntentResolutionSendBatchTimeout time.Duration }