diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index fb184e62743f..3cacb8610397 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -352,6 +352,7 @@ ALL_TESTS = [ "//pkg/util/cache:cache_test", "//pkg/util/caller:caller_test", "//pkg/util/cgroups:cgroups_test", + "//pkg/util/circuit:circuit_test", "//pkg/util/cloudinfo:cloudinfo_test", "//pkg/util/contextutil:contextutil_test", "//pkg/util/ctxgroup:ctxgroup_test", diff --git a/pkg/util/circuit/BUILD.bazel b/pkg/util/circuit/BUILD.bazel new file mode 100644 index 000000000000..bb6ecbf2b1be --- /dev/null +++ b/pkg/util/circuit/BUILD.bazel @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "circuit", + srcs = [ + "circuitbreaker.go", + "event_handler.go", + "options.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/util/circuit", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", + "@com_github_cockroachdb_redact//interfaces", + ], +) + +go_test( + name = "circuit_test", + srcs = ["circuitbreaker_test.go"], + data = glob(["testdata/**"]), + embed = [":circuit"], + deps = [ + "//pkg/testutils", + "//pkg/util/ctxgroup", + "//pkg/util/leaktest", + "//pkg/util/randutil", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/util/circuit/circuitbreaker.go b/pkg/util/circuit/circuitbreaker.go new file mode 100644 index 000000000000..e7db8f8d88b3 --- /dev/null +++ b/pkg/util/circuit/circuitbreaker.go @@ -0,0 +1,244 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package circuit + +import ( + "fmt" + "sync" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "github.com/cockroachdb/redact/interfaces" +) + +// Breaker is a circuit breaker. Before initiating an operation protected by the +// Breaker, Breaker.Signal should be called. This provides a channel and error +// getter that operate very similarly to context.Context's Done and Err methods. +// Err can be checked to fail-fast requests before initiating work and the +// channel can be used to abort an ongoing operation when the breaker trips (at +// which point Err returns a non-nil error). +// +// The Breaker trips when Report is called, which is typically the case when the +// operation is attempted and fails in a way that the caller believes will cause +// all other operations protected by the Breaker to fail as well. A tripped +// Breaker will launch asynchronous probes that reset the breaker as soon as +// possible. +type Breaker struct { + mu struct { + syncutil.RWMutex + *Options // always replaced wholesale + // errAndCh stores a channel and the error that should be returned when that + // channel is closed. When an error is first reported, the error is set and + // the channel is closed. On any subsequent errors (or reset), a new + // &errAndCh{} is allocated (and its error set and channel closed). This + // keeps the results of Signal() stable, i.e. the caller will get the error + // that closed "their" channel, and in particular they are guaranteed to get + // an error, while keeping Signal() allocation-free. + // + // This can be leaked out of the lock (read-only); to write `err` (and, to + // keep things simple, to close ch), need the exclusive lock. See also the + // comments inside of errAndCh. + errAndCh *errAndCh + + probing bool + } +} + +// NewBreaker instantiates a new circuit breaker. +func NewBreaker(opts Options) *Breaker { + br := &Breaker{} + br.mu.errAndCh = br.newErrAndCh() + br.Reconfigure(opts) + return br +} + +// Signal returns a channel that is closed once the breaker trips and a function +// (which may be invoked multiple times) returning a pertinent error. This is +// similar to context.Context's Done() and Err(). +// +// The returned method will return the error that closed the channel (and nil +// before the channel is closed), even if the breaker has already un-tripped in +// the meantime. This means that Signal should be re-invoked before each attempt +// at carrying out an operation. Non-nil errors will always be derived from +// ErrBreakerOpen, i.e. errors.Is(err, ErrBreakerOpen) will be true. +// +// Signal is allocation-free and suitable for use in performance-sensitive code +// paths. See ExampleBreaker_Signal for a usage example. +func (b *Breaker) Signal() interface { + Err() error + C() <-chan struct{} +} { + b.mu.RLock() + defer b.mu.RUnlock() + // NB: we need to return errAndCh here, returning (errAndCh.C(), errAndCh.Err) + // allocates. + return b.mu.errAndCh +} + +// Report reports a (non-nil) error to the breaker. This will trip the Breaker. +func (b *Breaker) Report(err error) { + if err == nil { + // Defense in depth: you're not supposed to pass a nil error in, + // but if it happens it's simply ignored. + return + } + // Give shouldTrip a chance to massage the error. + markErr := (*breakerErrorMark)(b) + if errors.Is(err, markErr) { + // The input error originated from this breaker. This shouldn't + // happen but since it is happening, we want to avoid creating + // longer and longer error chains below. + return + } + + // Update the error. This may overwrite an earlier error, which is fine: + // We want the breaker to reflect a recent error as this is more helpful. + storeErr := errors.Mark(errors.Mark(err, ErrBreakerOpen), markErr) + + // When the Breaker first trips, we populate the error and close the channel. + // When the error changes, we have to replace errAndCh wholesale (that's the + // contract, we can't mutate err once it's not nil) so we make a new channel + // that is then promptly closed. + b.mu.Lock() + prevErr := b.mu.errAndCh.err + if prevErr != nil { + b.mu.errAndCh = b.newErrAndCh() + } + // We get to write the error since we have exclusive access via b.mu. + b.mu.errAndCh.err = storeErr + close(b.mu.errAndCh.ch) + b.mu.Unlock() + + opts := b.Opts() + opts.EventHandler.OnTrip(b, prevErr, storeErr) + if prevErr == nil { + // If the breaker wasn't previously tripped, trigger the probe to give the + // Breaker a shot at healing right away. If the breaker is already tripped, + // we don't want to trigger another probe as the probe itself calls Report + // and we don't want a self-perpetuating loop of probe invocations. Instead, + // we only probe when clients are actively asking the Breaker for its + // status, via Breaker.Signal. + b.maybeTriggerProbe() + } +} + +// Reset resets (i.e. un-trips, if it was tripped) the breaker. +// Outside of testing, there should be no reason to call this +// as it is the probe's job to reset the breaker if appropriate. +func (b *Breaker) Reset() { + b.Opts().EventHandler.OnReset(b) + b.mu.Lock() + defer b.mu.Unlock() + b.mu.errAndCh = b.newErrAndCh() +} + +// String returns the Breaker's name. +func (b *Breaker) String() string { + return redact.StringWithoutMarkers(b) +} + +// SafeFormat implements redact.SafeFormatter. +func (b *Breaker) SafeFormat(s interfaces.SafePrinter, _ rune) { + s.Print(b.Opts().Name) +} + +// Opts returns the active options. +func (b *Breaker) Opts() Options { + b.mu.RLock() + defer b.mu.RUnlock() + return *b.mu.Options +} + +// Reconfigure swaps the active options for the supplied replacement. The breaker +// will not be reset. +func (b *Breaker) Reconfigure(opts Options) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.Options = &opts +} + +func (b *Breaker) maybeTriggerProbe() { + b.mu.Lock() + if b.mu.probing { + b.mu.Unlock() + // A probe is already running. + return + } + b.mu.probing = true + opts := *b.mu.Options // ok to leak out from under the lock + b.mu.Unlock() + + opts.EventHandler.OnProbeLaunched(b) + var once sync.Once + opts.AsyncProbe( + func(err error) { + if err != nil { + b.Report(err) + } else { + b.Reset() + } + }, + func() { + // Avoid potential problems when probe calls done() multiple times. + // It shouldn't do that, but mistakes happen. + once.Do(func() { + opts.EventHandler.OnProbeDone(b) + b.mu.Lock() + defer b.mu.Unlock() + b.mu.probing = false + }) + }) +} + +func (b *Breaker) newErrAndCh() *errAndCh { + return &errAndCh{ + maybeTriggerProbe: b.maybeTriggerProbe, + ch: make(chan struct{}), + } +} + +type errAndCh struct { + // maybeTriggerProbe is called when Err() returns non-nil. This indicates that + // the Breaker is tripped and that there is a caller that has a vested + // interest in the breaker trying to heal. + maybeTriggerProbe func() // immutable + ch chan struct{} + // INVARIANT: err can only be written once, immediately before closing ch + // (i.e. writer needs to maintain this externally). It can only be read after + // ch is closed (use `Err()`). + err error +} + +func (eac *errAndCh) C() <-chan struct{} { + return eac.ch +} + +func (eac *errAndCh) Err() error { + select { + case <-eac.ch: + eac.maybeTriggerProbe() + return eac.err + default: + return nil + } +} + +// ErrBreakerOpen is a reference error that matches the errors returned +// from Breaker.Err(), i.e. `errors.Is(err, ErrBreakerOpen) can be +// used to check whether an error originated from some Breaker. +var ErrBreakerOpen = errors.New("breaker open") + +type breakerErrorMark Breaker + +func (b *breakerErrorMark) Error() string { + return fmt.Sprintf("originated at breaker %s", (*Breaker)(b).Opts().Name) +} diff --git a/pkg/util/circuit/circuitbreaker_test.go b/pkg/util/circuit/circuitbreaker_test.go new file mode 100644 index 000000000000..c3d1ac2e755b --- /dev/null +++ b/pkg/util/circuit/circuitbreaker_test.go @@ -0,0 +1,403 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package circuit + +import ( + "context" + "fmt" + "path/filepath" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testLogBridge(t *testing.T) (*redact.StringBuilder, *EventLogger) { + var mu syncutil.Mutex + var allBuf redact.StringBuilder + return &allBuf, &EventLogger{Log: func(buf redact.StringBuilder) { + mu.Lock() + defer mu.Unlock() + allBuf.Printf("%s\n", buf) + t.Log(buf) + }} +} + +func TestBreaker(t *testing.T) { + defer leaktest.AfterTest(t)() + allBuf, eh := testLogBridge(t) + var report func(err error) + var done func() + br := NewBreaker(Options{ + Name: "mybreaker", + AsyncProbe: func(_report func(error), _done func()) { + if report != nil || done != nil { + t.Error("probe launched twice") + } + report = _report + done = _done + }, + EventHandler: eh, + }) + sig := br.Signal() + select { + case <-sig.C(): + t.Fatalf("channel should be open") + default: + } + // Nobody should call errFn in this situation, but it's fine if they do. + require.NoError(t, sig.Err()) + require.NoError(t, br.Signal().Err()) + require.Nil(t, report) + require.Nil(t, done) + boomErr := errors.New("boom") + br.Report(boomErr) + require.ErrorIs(t, br.Signal().Err(), boomErr) + // NB: this can't use ErrorIs because error marks are a cockroachdb/errors'ism. + require.True(t, errors.Is(br.Signal().Err(), ErrBreakerOpen)) + require.NotNil(t, report) + require.NotNil(t, done) + select { + case <-sig.C(): + err := sig.Err() + require.True(t, errors.Is(err, boomErr), "%+v", err) + default: + t.Fatal("expected channel to be closed") + } + + // New signal channel should similarly be closed + // and report the error. + sig = br.Signal() + select { + case <-sig.C(): + require.True(t, errors.Is(sig.Err(), boomErr)) + default: + t.Fatal("expected channel to be closed") + } + + { + err := br.Signal().Err() + require.True(t, errors.Is(err, ErrBreakerOpen), "%+v", err) + // Feeding the output error into the breaker again should not create + // longer error chains (check pointer equality to verify that nothing + // changed). + br.Report(err) + require.Equal(t, err, br.Signal().Err()) + br.Report(errors.Wrap(br.Signal().Err(), "more stuff")) + require.Equal(t, err, br.Signal().Err()) + } + br.Reset() + require.NoError(t, br.Signal().Err()) + // errFn from above should not return nil now, as per the + // contract on Signal(). However, it has to fall back to + // ErrBreakerOpen as boomErr is now wiped from the Breaker. + select { + case <-sig.C(): + err := sig.Err() + require.True(t, errors.Is(err, ErrBreakerOpen), "%+v", err) + default: + t.Fatal("expected channel to be closed") + } + + // Channel is open again. + sig = br.Signal() + select { + case <-sig.C(): + t.Fatal("channel unexpectedly closed") + default: + require.NoError(t, sig.Err()) + } + + { + // The probe reports an error. That error should update that returned by the + // breaker. + refErr := errors.New("probe error") + report(refErr) + err := br.Signal().Err() + require.True(t, errors.Is(err, refErr), "%+v not marked as %+v", err, refErr) + } + + { + // An error is passed to `br.Report`. This is somewhat unusual, after all + // the point of the breaker is to stop people from trying, but if it + // happens we update the breaker's error just the same. + refErr := errors.New("client error") + br.Report(refErr) + err := br.Signal().Err() + require.True(t, errors.Is(err, refErr), "%+v not marked as %+v", err, refErr) + } + + // Can't reset the breaker by calling `br.Report(nil)`. It's the probe's + // job to reset the breaker. + br.Report(nil) + require.Error(t, br.Signal().Err()) + + { + // The probe finishes. Breaker is still open, so next time anyone + // observes that, another probe is launched. + done() + report, done = nil, nil + require.Error(t, br.Signal().Err()) + require.NotNil(t, report) + require.NotNil(t, done) + } + + { + // The probe reports success this time. The breaker should reset and + // stay that way. + report(nil) + require.NoError(t, br.Signal().Err()) + // The probe finishes. + done() + require.NoError(t, br.Signal().Err()) + require.NoError(t, br.Signal().Err()) + } + + datadriven.RunTest(t, + filepath.Join("testdata", t.Name()+".txt"), + func(t *testing.T, d *datadriven.TestData) string { + return allBuf.String() + }) +} + +// TestBreakerProbeIsReactive verifies that probes are only launched on Report +// and when a client observes Err() != nil. +func TestBreakerProbeIsReactive(t *testing.T) { + defer leaktest.AfterTest(t)() + + allBuf, eh := testLogBridge(t) + var numProbes int32 // atomic + br := NewBreaker(Options{ + Name: "mybreaker", + AsyncProbe: func(_report func(error), _done func()) { + n := atomic.AddInt32(&numProbes, 1) + _report(errors.Errorf("probe error #%d", n)) + _done() + }, + EventHandler: eh, + }) + sig := br.Signal() + + br.Report(errors.New("boom")) + + requireNumProbes := func(t *testing.T, exp int32) { + t.Helper() + testutils.SucceedsSoon(t, func() error { + if act := atomic.LoadInt32(&numProbes); exp != act { + return errors.Errorf("expected %d probes, found %d", exp, act) + } + return nil + }) + } + + // If the probe ran in a busy loop, we'd quickly race past 1 and this test + // would be flaky. If the probe never ran, it would fail too. + time.Sleep(time.Millisecond) + _ = sig.C() // should not trigger another probe + time.Sleep(time.Millisecond) + requireNumProbes(t, 1) + + select { + case <-sig.C(): + default: + t.Fatal("Breaker not tripped") + } + + require.Error(t, sig.Err()) // should trigger probe + + requireNumProbes(t, 2) + + datadriven.RunTest(t, + filepath.Join("testdata", t.Name()+".txt"), + func(t *testing.T, d *datadriven.TestData) string { + return allBuf.String() + }) +} + +func TestBreakerRealistic(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s := stop.NewStopper() + defer s.Stop(ctx) + var intn func(int) int + { + rnd, seed := randutil.NewTestRand() + t.Logf("seed: %d", seed) + var mu syncutil.Mutex + intn = func(n int) int { + mu.Lock() + defer mu.Unlock() + return rnd.Intn(n) + } + } + const backoff = 1 * time.Millisecond + _, eh := testLogBridge(t) + br := NewBreaker(Options{ + Name: "testbreaker", + EventHandler: eh, + AsyncProbe: func(report func(error), done func()) { + if err := s.RunAsyncTask(ctx, "probe", func(ctx context.Context) { + defer done() + + for i := 1; ; i++ { + select { + case <-time.After(5 * backoff): + // Unblock the breaker 33% of the time. + if intn(3) < 1 { + report(nil /* err */) + return + } + if intn(2) < 1 { + // Report new error 50% of the time. + report(errors.Errorf("probe %d failed", i)) + } + case <-s.ShouldQuiesce(): + return + } + } + }); err != nil { + report(err) + done() + } + }, + }) + const numOpsPerWorker = 10 + const targetTripResetCycles = 5 + + var numTrips int32 // atomic + worker := func(ctx context.Context, t interface { + Logf(string, ...interface{}) + Errorf(string, ...interface{}) + }, idx int) error { + defer t.Logf("w%d: worker done", idx) + var doneOps int + for i := 0; doneOps < numOpsPerWorker; i++ { + runtime.Gosched() + t.Logf("w%d: attempting op%d (attempt %d)", idx, doneOps, i+1) + sig := br.Signal() + if err := sig.Err(); err != nil { + t.Logf("w%d: breaker open; backing off: %s", idx, err) + time.Sleep(backoff) + assert.Equal(t, err, sig.Err()) // invariant: non-nil errFn() never changes + continue + } + // 16.5% chance of handing an error to the breaker, but only a few times + // (so that the test is guaranteed to finish in a bounded amount of time + // in practice despite possible parameter changes) + if intn(6) < 1 && atomic.AddInt32(&numTrips, 1) < targetTripResetCycles { + err := errors.Errorf("error triggered by w%d/op%d", idx, doneOps) + t.Logf("injecting error: %s", err) + br.Report(err) + continue + } + select { + case <-time.After(time.Duration(intn(int(backoff)))): + case <-sig.C(): + err := sig.Err() + t.Logf("w%d: breaker open; aborted ongoing operation and backing off: %s", idx, err) + time.Sleep(backoff) + assert.Equal(t, err, sig.Err()) // invariant: non-nil errFn() never changes + continue + } + doneOps++ + t.Logf("w%d: op%d finished", idx, doneOps) + i = 0 + } + return nil + } + + const numWorkers = 8 + g := ctxgroup.WithContext(ctx) + for idx := 0; idx < numWorkers; idx++ { + idx := idx // copy for the goroutine + g.GoCtx(func(ctx context.Context) error { + return s.RunTaskWithErr(ctx, "worker", func(ctx context.Context) error { + return worker(ctx, t, idx) + }) + }) + } + require.NoError(t, g.Wait()) + // The way the test is set up, we should finish with a healthy breaker. + testutils.SucceedsSoon(t, func() error { + return br.Signal().Err() + }) +} + +func BenchmarkBreaker_Signal(b *testing.B) { + br := NewBreaker(Options{ + Name: redact.Sprint("Breaker"), + AsyncProbe: func(_ func(error), done func()) { + done() // never untrip + }, + EventHandler: &EventLogger{Log: func(redact.StringBuilder) {}}, + }) + + // The point of this benchmark is to verify the absence of allocations when + // calling Signal. + b.ReportAllocs() + for i := 0; i < b.N; i++ { + eac := br.Signal() + _ = eac.C() + if err := eac.Err(); err != nil { + b.Fatal(err) + } + } +} + +func ExampleBreaker_Signal() { + br := NewBreaker(Options{ + Name: redact.Sprint("Breaker"), + AsyncProbe: func(_ func(error), done func()) { + done() // never untrip + }, + EventHandler: &EventLogger{Log: func(builder redact.StringBuilder) { fmt.Println(builder.String()) }}, + }) + + launchWork := func() <-chan time.Time { + return time.After(time.Nanosecond) + } + + for i := 0; i < 3; i++ { + sig := br.Signal() + if err := sig.Err(); err != nil { + // Fail-fast - don't even begin work since the breaker indicates that + // there is a problem. + fmt.Println(err) + return // maybe retry later + } + workDoneCh := launchWork() + select { + case <-workDoneCh: + fmt.Println("work item", i+1, "done") + case <-sig.C(): + // Abort work as the breaker is now tripped. + fmt.Println(sig.Err()) + return // maybe retry later + } + } + + // Output: + // work item 1 done + // work item 2 done + // work item 3 done +} diff --git a/pkg/util/circuit/event_handler.go b/pkg/util/circuit/event_handler.go new file mode 100644 index 000000000000..7d551409eff7 --- /dev/null +++ b/pkg/util/circuit/event_handler.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package circuit + +import ( + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// An EventHandler is reported to by circuit breakers. +type EventHandler interface { + OnTrip(_ *Breaker, prev, cur error) + OnProbeLaunched(*Breaker) + OnProbeDone(*Breaker) + OnReset(*Breaker) +} + +// EventLogger is an implementation of EventHandler that relays to a logging +// function. For each event, Log is invoked with a StringBuilder containing +// a log message about the event. +type EventLogger struct { + Log func(redact.StringBuilder) +} + +// OnTrip implements EventHandler. If the previous error is nil, it logs the +// error. If the previous error is not nil and has a different cause than the +// current error, logs a message indicating the old and new error. +func (d *EventLogger) OnTrip(b *Breaker, prev, cur error) { + var buf redact.StringBuilder + opts := b.Opts() + if prev != nil && !errors.Is(errors.Cause(prev), errors.Cause(cur)) { + buf.Printf("%s: now tripped with error: %s (previously: %s)", opts.Name, cur, prev) + } else { + buf.Printf("%s: tripped with error: %s", opts.Name, cur) + } + d.Log(buf) +} + +// OnProbeLaunched implements EventHandler. It is a no-op. +func (d *EventLogger) OnProbeLaunched(*Breaker) {} + +// OnProbeDone implements EventHandler. It is a no-op. +func (d *EventLogger) OnProbeDone(*Breaker) {} + +// OnReset implements EventHandler. It logs a message. +func (d *EventLogger) OnReset(b *Breaker) { + var buf redact.StringBuilder + buf.Printf("%s: breaker reset", b.Opts().Name) + d.Log(buf) +} diff --git a/pkg/util/circuit/options.go b/pkg/util/circuit/options.go new file mode 100644 index 000000000000..97693db8581b --- /dev/null +++ b/pkg/util/circuit/options.go @@ -0,0 +1,47 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package circuit + +import "github.com/cockroachdb/redact" + +// Options are the arguments to NewBreaker. All fields are required. +type Options struct { + // Name is the name of a Breaker and will be mentioned in the errors + // that a particular Breaker generates. + Name redact.RedactableString + + // AsyncProbe is invoked when the Breaker is in a tripped state. The method + // should not block but instead delegate any work that needs to be done to a + // goroutine (the "probe") that can then invoke the methods supplied to it. + // Whenever the probe calls `report`, the error passed to it replaces the + // latest error tripping the probe. `report(nil)` untrips the Breaker. + // `done()` must be invoked when the probe winds down (regardless of whether + // the breaker is still tripped); this lets the Breaker know that if necessary + // AsyncProbe can be invoked again. + // + // It is legitimate for the work triggered by AsyncProbe to be long-running + // (i.e. it could repeatedly check if the condition triggering the breaker has + // resolved, returning only once it has or a timeout has elapsed) or one-shot + // (i.e. making a single attempt, reporting the result, and invoking + // `done()`). + // + // The Breaker will only ever permit one active invocation of the probe (i.e. + // once a probe is launched, no second probe is launched until the first probe + // returns via done()). In addition to this, it will trigger probes only when + // the Breaker first trips, and after that when Breaker.Signal returns a + // non-nil error. In other words, the probe is not spawned when the Breaker is + // not seeing any usage. + AsyncProbe func(report func(error), done func()) + + // EventHandler receives events from the Breaker. For an implementation that + // performs unstructured logging, see EventLogger. + EventHandler EventHandler +} diff --git a/pkg/util/circuit/testdata/TestBreaker.txt b/pkg/util/circuit/testdata/TestBreaker.txt new file mode 100644 index 000000000000..eddcce92ee52 --- /dev/null +++ b/pkg/util/circuit/testdata/TestBreaker.txt @@ -0,0 +1,7 @@ +log +---- +mybreaker: tripped with error: boom +mybreaker: breaker reset +mybreaker: tripped with error: probe error +mybreaker: now tripped with error: client error (previously: probe error) +mybreaker: breaker reset diff --git a/pkg/util/circuit/testdata/TestBreakerProbeIsReactive.txt b/pkg/util/circuit/testdata/TestBreakerProbeIsReactive.txt new file mode 100644 index 000000000000..355ab69ff28b --- /dev/null +++ b/pkg/util/circuit/testdata/TestBreakerProbeIsReactive.txt @@ -0,0 +1,5 @@ +log +---- +mybreaker: tripped with error: boom +mybreaker: now tripped with error: probe error #1 (previously: boom) +mybreaker: now tripped with error: probe error #2 (previously: probe error #1)