Skip to content

Commit

Permalink
intentresolver: Add max timeout to RequestBatcher and IntentResolver
Browse files Browse the repository at this point in the history
There is a limit of 1000 in-flight intent resolution request batches
that can be processed at a time before backpressure. We saw a case where
an unavailable range resulted in many intent resolution request batches
to be stuck, clogging up the worker pool and starving the other ranges
trying to resolve intents. This resulted in more queries timing out.

To address this, this patch adds a max timeout to RequestBatcher and
IntentResolver to ensure that no worker trying to resolve a batch of
intents gets stuck indefinitely e.g. due to an unavailable range.

Release note (ops change): Added max timeout to intent resolution,
preventing intent resolution from becoming stuck indefinitely and
blocking other ranges attempting to resolve intents.
  • Loading branch information
KaiSun314 committed Nov 21, 2022
1 parent ae919e3 commit e98ffcf
Show file tree
Hide file tree
Showing 8 changed files with 449 additions and 77 deletions.
19 changes: 17 additions & 2 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ type Config struct {
// enforced. It is inadvisable to disable both MaxIdle and MaxWait.
MaxIdle time.Duration

// MaxTimeout limits the amount of time that sending a batch can run for
// before timing out. This is used to prevent batches from stalling
// indefinitely, for instance due to an unavailable range. If MaxTimeout is
// <= 0, then the send batch timeout is derived from the requests' deadlines
// if they exist.
MaxTimeout time.Duration

// InFlightBackpressureLimit is the number of batches in flight above which
// sending clients should experience backpressure. If the batcher has more
// requests than this in flight it will not accept new requests until the
Expand Down Expand Up @@ -280,11 +287,19 @@ func (b *RequestBatcher) sendBatch(ctx context.Context, ba *batch) {
}
return nil
}
var deadline time.Time
if b.cfg.MaxTimeout > 0 {
deadline = timeutil.Now().Add(b.cfg.MaxTimeout)
}
if !ba.sendDeadline.IsZero() {
if deadline.IsZero() || ba.sendDeadline.Before(deadline) {
deadline = ba.sendDeadline
}
}
if !deadline.IsZero() {
actualSend := send
send = func(context.Context) error {
return contextutil.RunWithTimeout(
ctx, b.sendBatchOpName, timeutil.Until(ba.sendDeadline), actualSend)
return contextutil.RunWithTimeout(ctx, b.sendBatchOpName, timeutil.Until(deadline), actualSend)
}
}
// Send requests in a loop to support pagination, which may be necessary
Expand Down
101 changes: 67 additions & 34 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,46 +312,79 @@ func TestPanicWithNilStopper(t *testing.T) {
}

// TestBatchTimeout verifies the RequestBatcher uses the context with the
// deadline from the latest call to send.
// deadline from the latest call and max timeout to send.
func TestBatchTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
const timeout = 5 * time.Millisecond
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
t.Run("WithTimeout", func(t *testing.T) {
b := New(Config{
// MaxMsgsPerBatch of 1 is chosen so that the first call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
})
// This test attempts to verify that a batch with a request with a timeout
// will be sent with that timeout. The test faces challenges of timing.
// There are several different phases at which the timeout may fire;
// the request may time out before it has been sent to the batcher, it
// may timeout while it is being sent or it may not time out until after
// it has been sent. Each of these cases are handled and verified to ensure
// that the request was indeed sent with a timeout.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
respChan := make(chan Response, 1)
if err := b.SendWithChan(ctx, respChan, 1, &roachpb.GetRequest{}); err != nil {
testutils.IsError(err, context.DeadlineExceeded.Error())
return
}
select {
case s := <-sc:
deadline, hasDeadline := s.ctx.Deadline()
assert.True(t, hasDeadline)
assert.True(t, timeutil.Until(deadline) < timeout)
s.respChan <- batchResp{}
case resp := <-respChan:
assert.Nil(t, resp.Resp)
testutils.IsError(resp.Err, context.DeadlineExceeded.Error())
}
})
testCases := []struct {
requestTimeout time.Duration
maxTimeout time.Duration
expectedTimeout time.Duration
}{
{
requestTimeout: timeout,
maxTimeout: 0,
expectedTimeout: timeout,
},
{
requestTimeout: 0,
maxTimeout: timeout,
expectedTimeout: timeout,
},
{
requestTimeout: timeout,
maxTimeout: 3 * time.Millisecond,
expectedTimeout: 3 * time.Millisecond,
},
{
requestTimeout: timeout,
maxTimeout: 7 * time.Millisecond,
expectedTimeout: timeout,
},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("With%sRequestTimeout%sMaxTimeout", tc.requestTimeout, tc.maxTimeout),
func(t *testing.T) {
b := New(Config{
// MaxMsgsPerBatch of 1 is chosen so that the first call to Send will
// immediately lead to a batch being sent.
MaxMsgsPerBatch: 1,
Sender: sc,
Stopper: stopper,
MaxTimeout: tc.maxTimeout,
})
// This test attempts to verify that a batch with a request with a
// timeout will be sent with that timeout. The test faces challenges of
// timing. There are several different phases at which the timeout may
// fire; the request may time out before it has been sent to the
// batcher, it may timeout while it is being sent or it may not time
// out until after it has been sent. Each of these cases are handled
// and verified to ensure that the request was indeed sent with a
// timeout.
ctx, cancel := context.WithTimeout(context.Background(), tc.requestTimeout)
defer cancel()
respChan := make(chan Response, 1)
if err := b.SendWithChan(ctx, respChan, 1, &roachpb.GetRequest{}); err != nil {
testutils.IsError(err, context.DeadlineExceeded.Error())
return
}
select {
case s := <-sc:
deadline, hasDeadline := s.ctx.Deadline()
assert.True(t, hasDeadline)
assert.True(t, timeutil.Until(deadline) < tc.expectedTimeout)
assert.True(t, timeutil.Until(deadline) > tc.expectedTimeout-time.Millisecond)
s.respChan <- batchResp{}
case resp := <-respChan:
assert.Nil(t, resp.Resp)
testutils.IsError(resp.Err, context.DeadlineExceeded.Error())
}
},
)
}
t.Run("NoTimeout", func(t *testing.T) {
b := New(Config{
// MaxMsgsPerBatch of 2 is chosen so that the second call to Send will
Expand Down
15 changes: 14 additions & 1 deletion pkg/kv/kvserver/intentresolver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,37 @@ go_library(
go_test(
name = "intentresolver_test",
size = "small",
srcs = ["intent_resolver_test.go"],
srcs = [
"intent_resolver_integration_test.go",
"intent_resolver_test.go",
"main_test.go",
],
args = ["-test.timeout=55s"],
embed = [":intentresolver"],
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
62 changes: 40 additions & 22 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ const (
// ResumeSpan and the batcher will send a new range request.
intentResolverRangeRequestSize = 200

// intentResolverSendBatchTimeout is the maximum amount of time an intent
// resolution batch request can run for before timeout.
intentResolverSendBatchTimeout = 1 * time.Minute

// MaxTxnsPerIntentCleanupBatch is the number of transactions whose
// corresponding intents will be resolved at a time. Intents are batched
// by transaction to avoid timeouts while resolving intents and ensure that
Expand Down Expand Up @@ -206,18 +210,28 @@ func New(c Config) *IntentResolver {
c.Stopper.AddCloser(ir.sem.Closer("stopper"))
ir.mu.inFlightPushes = map[uuid.UUID]int{}
ir.mu.inFlightTxnCleanups = map[uuid.UUID]struct{}{}
intentResolutionSendBatchTimeout := intentResolverSendBatchTimeout
if c.TestingKnobs.MaxIntentResolutionSendBatchTimeout != 0 {
intentResolutionSendBatchTimeout = c.TestingKnobs.MaxIntentResolutionSendBatchTimeout
}
inFlightBackpressureLimit := requestbatcher.DefaultInFlightBackpressureLimit
if c.TestingKnobs.InFlightBackpressureLimit != 0 {
inFlightBackpressureLimit = c.TestingKnobs.InFlightBackpressureLimit
}
gcBatchSize := gcBatchSize
if c.TestingKnobs.MaxIntentResolutionBatchSize > 0 {
gcBatchSize = c.TestingKnobs.MaxGCBatchSize
}
ir.gcBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_gc_batcher",
MaxMsgsPerBatch: gcBatchSize,
MaxWait: c.MaxGCBatchWait,
MaxIdle: c.MaxGCBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_gc_batcher",
MaxMsgsPerBatch: gcBatchSize,
MaxWait: c.MaxGCBatchWait,
MaxIdle: c.MaxGCBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
InFlightBackpressureLimit: inFlightBackpressureLimit,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
intentResolutionBatchSize := intentResolverBatchSize
intentResolutionRangeBatchSize := intentResolverRangeBatchSize
Expand All @@ -226,23 +240,27 @@ func New(c Config) *IntentResolver {
intentResolutionRangeBatchSize = c.TestingKnobs.MaxIntentResolutionBatchSize
}
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: intentResolutionBatchSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: intentResolutionBatchSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
InFlightBackpressureLimit: inFlightBackpressureLimit,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
ir.irRangeBatcher = requestbatcher.New(requestbatcher.Config{
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_range_batcher",
MaxMsgsPerBatch: intentResolutionRangeBatchSize,
MaxKeysPerBatchReq: intentResolverRangeRequestSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
AmbientCtx: c.AmbientCtx,
Name: "intent_resolver_ir_range_batcher",
MaxMsgsPerBatch: intentResolutionRangeBatchSize,
MaxKeysPerBatchReq: intentResolverRangeRequestSize,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
MaxTimeout: intentResolutionSendBatchTimeout,
InFlightBackpressureLimit: inFlightBackpressureLimit,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
return ir
}
Expand Down
Loading

0 comments on commit e98ffcf

Please sign in to comment.