Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

circuit: add probing-based circuit breaker #73641

Merged
merged 1 commit into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ ALL_TESTS = [
"//pkg/util/cache:cache_test",
"//pkg/util/caller:caller_test",
"//pkg/util/cgroups:cgroups_test",
"//pkg/util/circuit:circuit_test",
"//pkg/util/cloudinfo:cloudinfo_test",
"//pkg/util/contextutil:contextutil_test",
"//pkg/util/ctxgroup:ctxgroup_test",
Expand Down
38 changes: 38 additions & 0 deletions pkg/util/circuit/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "circuit",
srcs = [
"circuitbreaker.go",
"event_handler.go",
"options.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/util/circuit",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
],
)

go_test(
name = "circuit_test",
srcs = ["circuitbreaker_test.go"],
data = glob(["testdata/**"]),
embed = [":circuit"],
deps = [
"//pkg/testutils",
"//pkg/util/ctxgroup",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
244 changes: 244 additions & 0 deletions pkg/util/circuit/circuitbreaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package circuit

import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/cockroachdb/redact/interfaces"
)

// Breaker is a circuit breaker. Before initiating an operation protected by the
// Breaker, Breaker.Signal should be called. This provides a channel and error
// getter that operate very similarly to context.Context's Done and Err methods.
// Err can be checked to fail-fast requests before initiating work and the
// channel can be used to abort an ongoing operation when the breaker trips (at
// which point Err returns a non-nil error).
//
// The Breaker trips when Report is called, which is typically the case when the
// operation is attempted and fails in a way that the caller believes will cause
// all other operations protected by the Breaker to fail as well. A tripped
// Breaker will launch asynchronous probes that reset the breaker as soon as
// possible.
type Breaker struct {
mu struct {
syncutil.RWMutex
*Options // always replaced wholesale
// errAndCh stores a channel and the error that should be returned when that
// channel is closed. When an error is first reported, the error is set and
// the channel is closed. On any subsequent errors (or reset), a new
// &errAndCh{} is allocated (and its error set and channel closed). This
// keeps the results of Signal() stable, i.e. the caller will get the error
// that closed "their" channel, and in particular they are guaranteed to get
// an error, while keeping Signal() allocation-free.
//
// This can be leaked out of the lock (read-only); to write `err` (and, to
// keep things simple, to close ch), need the exclusive lock. See also the
// comments inside of errAndCh.
errAndCh *errAndCh

probing bool
}
}

// NewBreaker instantiates a new circuit breaker.
func NewBreaker(opts Options) *Breaker {
br := &Breaker{}
br.mu.errAndCh = br.newErrAndCh()
br.Reconfigure(opts)
return br
}

// Signal returns a channel that is closed once the breaker trips and a function
// (which may be invoked multiple times) returning a pertinent error. This is
// similar to context.Context's Done() and Err().
//
// The returned method will return the error that closed the channel (and nil
// before the channel is closed), even if the breaker has already un-tripped in
// the meantime. This means that Signal should be re-invoked before each attempt
// at carrying out an operation. Non-nil errors will always be derived from
// ErrBreakerOpen, i.e. errors.Is(err, ErrBreakerOpen) will be true.
//
// Signal is allocation-free and suitable for use in performance-sensitive code
// paths. See ExampleBreaker_Signal for a usage example.
func (b *Breaker) Signal() interface {
Err() error
C() <-chan struct{}
} {
b.mu.RLock()
defer b.mu.RUnlock()
// NB: we need to return errAndCh here, returning (errAndCh.C(), errAndCh.Err)
// allocates.
return b.mu.errAndCh
}

// Report reports a (non-nil) error to the breaker. This will trip the Breaker.
func (b *Breaker) Report(err error) {
if err == nil {
// Defense in depth: you're not supposed to pass a nil error in,
// but if it happens it's simply ignored.
return
}
// Give shouldTrip a chance to massage the error.
markErr := (*breakerErrorMark)(b)
if errors.Is(err, markErr) {
// The input error originated from this breaker. This shouldn't
// happen but since it is happening, we want to avoid creating
// longer and longer error chains below.
return
}

// Update the error. This may overwrite an earlier error, which is fine:
// We want the breaker to reflect a recent error as this is more helpful.
storeErr := errors.Mark(errors.Mark(err, ErrBreakerOpen), markErr)

// When the Breaker first trips, we populate the error and close the channel.
// When the error changes, we have to replace errAndCh wholesale (that's the
// contract, we can't mutate err once it's not nil) so we make a new channel
// that is then promptly closed.
b.mu.Lock()
prevErr := b.mu.errAndCh.err
if prevErr != nil {
b.mu.errAndCh = b.newErrAndCh()
}
// We get to write the error since we have exclusive access via b.mu.
b.mu.errAndCh.err = storeErr
close(b.mu.errAndCh.ch)
b.mu.Unlock()

opts := b.Opts()
opts.EventHandler.OnTrip(b, prevErr, storeErr)
if prevErr == nil {
// If the breaker wasn't previously tripped, trigger the probe to give the
// Breaker a shot at healing right away. If the breaker is already tripped,
// we don't want to trigger another probe as the probe itself calls Report
// and we don't want a self-perpetuating loop of probe invocations. Instead,
// we only probe when clients are actively asking the Breaker for its
// status, via Breaker.Signal.
b.maybeTriggerProbe()
}
}

// Reset resets (i.e. un-trips, if it was tripped) the breaker.
// Outside of testing, there should be no reason to call this
// as it is the probe's job to reset the breaker if appropriate.
func (b *Breaker) Reset() {
b.Opts().EventHandler.OnReset(b)
b.mu.Lock()
defer b.mu.Unlock()
b.mu.errAndCh = b.newErrAndCh()
}

// String returns the Breaker's name.
func (b *Breaker) String() string {
return redact.StringWithoutMarkers(b)
}

// SafeFormat implements redact.SafeFormatter.
func (b *Breaker) SafeFormat(s interfaces.SafePrinter, _ rune) {
s.Print(b.Opts().Name)
}

// Opts returns the active options.
func (b *Breaker) Opts() Options {
b.mu.RLock()
defer b.mu.RUnlock()
return *b.mu.Options
}

// Reconfigure swaps the active options for the supplied replacement. The breaker
// will not be reset.
func (b *Breaker) Reconfigure(opts Options) {
b.mu.Lock()
defer b.mu.Unlock()
b.mu.Options = &opts
}

func (b *Breaker) maybeTriggerProbe() {
b.mu.Lock()
if b.mu.probing {
b.mu.Unlock()
// A probe is already running.
return
}
b.mu.probing = true
opts := *b.mu.Options // ok to leak out from under the lock
b.mu.Unlock()

opts.EventHandler.OnProbeLaunched(b)
var once sync.Once
opts.AsyncProbe(
func(err error) {
if err != nil {
b.Report(err)
} else {
b.Reset()
}
},
func() {
// Avoid potential problems when probe calls done() multiple times.
// It shouldn't do that, but mistakes happen.
once.Do(func() {
opts.EventHandler.OnProbeDone(b)
b.mu.Lock()
defer b.mu.Unlock()
b.mu.probing = false
})
})
}

func (b *Breaker) newErrAndCh() *errAndCh {
return &errAndCh{
maybeTriggerProbe: b.maybeTriggerProbe,
ch: make(chan struct{}),
}
}

type errAndCh struct {
// maybeTriggerProbe is called when Err() returns non-nil. This indicates that
// the Breaker is tripped and that there is a caller that has a vested
// interest in the breaker trying to heal.
maybeTriggerProbe func() // immutable
ch chan struct{}
// INVARIANT: err can only be written once, immediately before closing ch
// (i.e. writer needs to maintain this externally). It can only be read after
// ch is closed (use `Err()`).
err error
}

func (eac *errAndCh) C() <-chan struct{} {
return eac.ch
}

func (eac *errAndCh) Err() error {
select {
case <-eac.ch:
eac.maybeTriggerProbe()
return eac.err
default:
return nil
}
}

// ErrBreakerOpen is a reference error that matches the errors returned
// from Breaker.Err(), i.e. `errors.Is(err, ErrBreakerOpen) can be
// used to check whether an error originated from some Breaker.
var ErrBreakerOpen = errors.New("breaker open")

type breakerErrorMark Breaker

func (b *breakerErrorMark) Error() string {
return fmt.Sprintf("originated at breaker %s", (*Breaker)(b).Opts().Name)
}
Loading