Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
118943: kvcoord: add DistSender circuit breakers r=nvanbenschoten a=erikgrinaker

This patch adds an initial implementation of DistSender replica circuit breakers. Their primary purpose is to prevent the DistSender getting stuck on non-functional replicas. In particular, the DistSender relies on receiving a NLHE from the replica to update its range cache and try other replicas, otherwise it will keep sending requests to the same broken replica which will continue to get stuck, giving the appearance of an unavailable range. This can happen if:

- The replica stalls, e.g. with a disk stall or mutex deadlock.

- Clients time out before the replica lease acquisition attempt times out, e.g. if the replica is partitioned away from the leader.

If a replica has returned only errors in the past few seconds, or hasn't returned any responses at all, the circuit breaker will probe the replica by sending a `LeaseInfo` request. This must either return success or a NLHE pointing to a leaseholder.  Otherwise, the circuit breaker trips, and the DistSender will skip it for future requests, optionally also cancelling in-flight requests.

Currently, only replica-level circuit breakers are implemented. If a range is unavailable, the DistSender will continue to retry replicas as today. Range-level circuit breakers can be added later if needed, but are considered out of scope here.

The circuit breakers are disabled by default for now. Some follow-up work is likely needed before they can be enabled by default:

* Improve probe scalability. Currently, a goroutine is spawned per replica probe, which is likely too expensive at large scales. We should consider batching probes to nodes/stores, and using a bounded worker pool.

* Consider follower read handling, e.g. by tracking the replica's closed timestamp and allowing requests that may still be served by it even if it's partitioned away from the leaseholder.

* Improve observability, with metrics, tracing, and logging.

* Comprehensive testing and benchmarking.

This will be addressed separately.

Resolves #105168.
Resolves #104262.
Resolves  #81100.
Resolves #80713.
Epic: none
Release note (general change): gateways will now detect faulty or stalled replicas and use other replicas instead, which can prevent them getting stuck in certain cases (e.g. with disk stalls). This behavior can be disabled via the cluster setting `kv.dist_sender.circuit_breaker.enabled`.

119880: typedesc: copy composite type elements in `AsTypesT` r=DrewKimball a=DrewKimball

This commit adds copying for the elements of a composite type in the `TypeDescriptor.AsTypesT` method. This avoids data races during type hydration.

Fixes #119866

Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Drew Kimball <[email protected]>
  • Loading branch information
3 people committed Mar 4, 2024
3 parents 48dc97b + 68a90a6 + 1bec08c commit bf013ea
Show file tree
Hide file tree
Showing 7 changed files with 1,139 additions and 12 deletions.
5 changes: 5 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ kv.closed_timestamp.follower_reads.enabled boolean true allow (all) replicas to
kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps system-visible
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport system-visible
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration system-visible
kv.dist_sender.circuit_breaker.cancellation.enabled boolean true when enabled, in-flight requests will be cancelled when the circuit breaker trips application
kv.dist_sender.circuit_breaker.enabled boolean true enable circuit breakers for failing or stalled replicas application
kv.dist_sender.circuit_breaker.probe.interval duration 3s interval between replica probes application
kv.dist_sender.circuit_breaker.probe.threshold duration 3s duration of errors or stalls after which a replica will be probed application
kv.dist_sender.circuit_breaker.probe.timeout duration 3s timeout for replica probes application
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records system-visible
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited application
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval system-visible
Expand Down
5 changes: 5 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-cancellation-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.cancellation.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when enabled, in-flight requests will be cancelled when the circuit breaker trips</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-enabled" class="anchored"><code>kv.dist_sender.circuit_breaker.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable circuit breakers for failing or stalled replicas</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-interval" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.interval</code></div></td><td>duration</td><td><code>3s</code></td><td>interval between replica probes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-threshold" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.threshold</code></div></td><td>duration</td><td><code>3s</code></td><td>duration of errors or stalls after which a replica will be probed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-dist-sender-circuit-breaker-probe-timeout" class="anchored"><code>kv.dist_sender.circuit_breaker.probe.timeout</code></div></td><td>duration</td><td><code>3s</code></td><td>timeout for replica probes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-global-budget" class="anchored"><code>kv.lease_transfer_read_summary.global_budget</code></div></td><td>byte size</td><td><code>0 B</code></td><td>controls the maximum number of bytes that will be used to summarize the global segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-local-budget" class="anchored"><code>kv.lease_transfer_read_summary.local_budget</code></div></td><td>byte size</td><td><code>4.0 MiB</code></td><td>controls the maximum number of bytes that will be used to summarize the local segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_circuit_breaker.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"dist_sender_rangefeed_canceler.go",
Expand Down Expand Up @@ -66,6 +67,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/ctxgroup",
"//pkg/util/envutil",
"//pkg/util/errorutil/unimplemented",
Expand Down Expand Up @@ -122,6 +124,7 @@ go_test(
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_ambiguous_test.go",
"dist_sender_circuit_breaker_test.go",
"dist_sender_rangefeed_canceler_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
Expand Down
45 changes: 34 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ type DistSender struct {
transportFactory TransportFactory
rpcRetryOptions retry.Options
asyncSenderSem *quotapool.IntPool
circuitBreakers *DistSenderCircuitBreakers

// batchInterceptor is set for tenants; when set, information about all
// BatchRequests and BatchResponses are passed through this interceptor, which
Expand Down Expand Up @@ -726,6 +727,13 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
})
}

// Set up circuit breakers and spawn the manager goroutine, which runs until
// the stopper stops. This can only error if the server is shutting down, so
// ignore the returned error.
ds.circuitBreakers = NewDistSenderCircuitBreakers(
ds.stopper, ds.st, ds.transportFactory, ds.metrics)
_ = ds.circuitBreakers.Start()

if cfg.TestingKnobs.LatencyFunc != nil {
ds.latencyFunc = cfg.TestingKnobs.LatencyFunc
}
Expand Down Expand Up @@ -2454,23 +2462,38 @@ func (ds *DistSender) sendToReplicas(
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).
Track(ctx, ba, tBegin.UnixNano())
if cbErr != nil {
// Circuit breaker is tripped. err will be handled below.
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, ba)
tEnd := timeutil.Now()
cbToken.Done(br, err, tEnd.UnixNano())

if dur := tEnd.Sub(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
}
}
}

ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

if err != nil {
if cbErr != nil {
log.VErrEventf(ctx, 2, "circuit breaker error: %s", cbErr)
// We know the request did not start, so the error is not ambiguous.

} else if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)

if grpcutil.IsAuthError(err) {
Expand Down
Loading

0 comments on commit bf013ea

Please sign in to comment.