From 45cc6b636f0cf10bd7563e569c66a18f9ad2da4a Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 16:11:38 +0900 Subject: [PATCH 1/8] bugfix success handling in the half-open and add flow control Signed-off-by: hlts2 --- internal/circuitbreaker/breaker.go | 26 +++- internal/circuitbreaker/breaker_test.go | 196 +++++++++++++++++++----- internal/circuitbreaker/counter.go | 5 + internal/errors/circuitbreaker.go | 4 +- 4 files changed, 182 insertions(+), 49 deletions(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index a0d6ca8030..8ac5699ed1 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -68,8 +68,8 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) { // do executes the function given argument when the current breaker state is "Closed" or "Half-Open". // If the current breaker state is "Open", this function returns ErrCircuitBreakerOpenState. func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val interface{}, err error)) (val interface{}, st State, err error) { - if !b.isReady() { - return nil, StateOpen, errors.ErrCircuitBreakerOpenState + if st, ok, err := b.isReady(); !ok && err != nil { + return nil, st, err } val, err = fn(ctx) if err != nil { @@ -98,14 +98,26 @@ func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val inte // isReady determines the breaker is ready or not. // If the current breaker state is "Closed" or "Half-Open", this function returns true. -func (b *breaker) isReady() (ok bool) { - st := b.currentState() - return st == StateClosed || st == StateHalfOpen +func (b *breaker) isReady() (st State, ok bool, err error) { + st = b.currentState() + switch st { + case StateOpen: + return st, false, errors.ErrCircuitBreakerOpenState + case StateHalfOpen: + + // For flow control in the "Half-Open" state. It is limited to 50%. + total := b.count.Load().(*count).Total() + if total != 0 && total%2 == 0 { + return st, false, errors.ErrCircuitBreakerHalfOpenFlowLimitation + } + } + return st, true, nil } func (b *breaker) success() { - b.count.Load().(*count).onSuccess() - if st := b.currentState(); st == StateHalfOpen { + cnt := b.count.Load().(*count) + cnt.onSuccess() + if st := b.currentState(); st == StateHalfOpen && cnt.Total() > b.minSamples && !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String()) b.reset() } diff --git a/internal/circuitbreaker/breaker_test.go b/internal/circuitbreaker/breaker_test.go index 7a6f8c5bb7..4e5b8d6189 100644 --- a/internal/circuitbreaker/breaker_test.go +++ b/internal/circuitbreaker/breaker_test.go @@ -38,8 +38,8 @@ func Test_newBreaker(t *testing.T) { args args want want checkFunc func(want, *breaker, error) error - beforeFunc func(args) - afterFunc func(args) + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) } defaultCheckFunc := func(w want, got *breaker, err error) error { if !errors.Is(err, w.err) { @@ -61,6 +61,12 @@ func Test_newBreaker(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, }, */ @@ -75,6 +81,12 @@ func Test_newBreaker(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, } }(), */ @@ -86,10 +98,10 @@ func Test_newBreaker(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc(test.args) + test.beforeFunc(tt, test.args) } if test.afterFunc != nil { - defer test.afterFunc(test.args) + defer test.afterFunc(tt, test.args) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -134,8 +146,8 @@ func Test_breaker_do(t *testing.T) { fields fields want want checkFunc func(want, interface{}, State, error) error - beforeFunc func(args) - afterFunc func(args) + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) } defaultCheckFunc := func(w want, gotVal interface{}, gotSt State, err error) error { if !errors.Is(err, w.err) { @@ -174,6 +186,12 @@ func Test_breaker_do(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, }, */ @@ -202,6 +220,12 @@ func Test_breaker_do(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, } }(), */ @@ -213,10 +237,10 @@ func Test_breaker_do(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc(test.args) + test.beforeFunc(tt, test.args) } if test.afterFunc != nil { - defer test.afterFunc(test.args) + defer test.afterFunc(tt, test.args) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -261,17 +285,25 @@ func Test_breaker_isReady(t *testing.T) { closedRefreshExp int64 } type want struct { + wantSt State wantOk bool + err error } type test struct { name string fields fields want want - checkFunc func(want, bool) error - beforeFunc func() - afterFunc func() + checkFunc func(want, State, bool, error) error + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } - defaultCheckFunc := func(w want, gotOk bool) error { + defaultCheckFunc := func(w want, gotSt State, gotOk bool, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(gotSt, w.wantSt) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotSt, w.wantSt) + } if !reflect.DeepEqual(gotOk, w.wantOk) { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotOk, w.wantOk) } @@ -298,6 +330,12 @@ func Test_breaker_isReady(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -322,6 +360,12 @@ func Test_breaker_isReady(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -333,10 +377,10 @@ func Test_breaker_isReady(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -357,8 +401,8 @@ func Test_breaker_isReady(t *testing.T) { closedRefreshExp: test.fields.closedRefreshExp, } - gotOk := b.isReady() - if err := checkFunc(test.want, gotOk); err != nil { + gotSt, gotOk, err := b.isReady() + if err := checkFunc(test.want, gotSt, gotOk, err); err != nil { tt.Errorf("error = %v", err) } }) @@ -386,8 +430,8 @@ func Test_breaker_success(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -413,6 +457,12 @@ func Test_breaker_success(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -437,6 +487,12 @@ func Test_breaker_success(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -448,10 +504,10 @@ func Test_breaker_success(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -501,8 +557,8 @@ func Test_breaker_fail(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -528,6 +584,12 @@ func Test_breaker_fail(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -552,6 +614,12 @@ func Test_breaker_fail(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -563,10 +631,10 @@ func Test_breaker_fail(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -618,8 +686,8 @@ func Test_breaker_currentState(t *testing.T) { fields fields want want checkFunc func(want, State) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want, got State) error { if !reflect.DeepEqual(got, w.want) { @@ -648,6 +716,12 @@ func Test_breaker_currentState(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -672,6 +746,12 @@ func Test_breaker_currentState(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -683,10 +763,10 @@ func Test_breaker_currentState(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -736,8 +816,8 @@ func Test_breaker_reset(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -763,6 +843,12 @@ func Test_breaker_reset(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -787,6 +873,12 @@ func Test_breaker_reset(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -798,10 +890,10 @@ func Test_breaker_reset(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -851,8 +943,8 @@ func Test_breaker_trip(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -878,6 +970,12 @@ func Test_breaker_trip(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -902,6 +1000,12 @@ func Test_breaker_trip(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -913,10 +1017,10 @@ func Test_breaker_trip(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -968,8 +1072,8 @@ func Test_breaker_isTripped(t *testing.T) { fields fields want want checkFunc func(want, bool) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want, gotOk bool) error { if !reflect.DeepEqual(gotOk, w.wantOk) { @@ -998,6 +1102,12 @@ func Test_breaker_isTripped(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -1022,6 +1132,12 @@ func Test_breaker_isTripped(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -1033,10 +1149,10 @@ func Test_breaker_isTripped(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { diff --git a/internal/circuitbreaker/counter.go b/internal/circuitbreaker/counter.go index 5ebea3ae3e..bc20e906c7 100644 --- a/internal/circuitbreaker/counter.go +++ b/internal/circuitbreaker/counter.go @@ -16,6 +16,7 @@ package circuitbreaker import "sync/atomic" type Counter interface { + Total() int64 Successes() int64 Fails() int64 } @@ -33,6 +34,10 @@ func (c *count) Fails() (n int64) { return atomic.LoadInt64(&c.failures) } +func (c *count) Total() (n int64) { + return c.Successes() + c.Fails() +} + func (c *count) onSuccess() { atomic.AddInt64(&c.successes, 1) } diff --git a/internal/errors/circuitbreaker.go b/internal/errors/circuitbreaker.go index ac0ca1e5f3..014f0459e0 100644 --- a/internal/errors/circuitbreaker.go +++ b/internal/errors/circuitbreaker.go @@ -18,8 +18,8 @@ import ( ) var ( - // ErrCircuitBreakerTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests. - ErrCircuitBreakerTooManyRequests = errors.New("too many requests") + // ErrCircuitBreakerHalfOpenFlowLimitation is returned in case of flow limitation in half-open state. + ErrCircuitBreakerHalfOpenFlowLimitation = errors.New("circuitbreaker breaker half-open flow limitation") // ErrCircuitBreakerOpenState is returned when the CB state is open. ErrCircuitBreakerOpenState = errors.New("circuit breaker is open") ) From bc8a80bc68decb3f2394f272447f310870b7b921 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 16:20:00 +0900 Subject: [PATCH 2/8] deleted boolean type of isReady function Signed-off-by: hlts2 --- internal/circuitbreaker/breaker.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index 8ac5699ed1..cd39024e8d 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -68,7 +68,7 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) { // do executes the function given argument when the current breaker state is "Closed" or "Half-Open". // If the current breaker state is "Open", this function returns ErrCircuitBreakerOpenState. func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val interface{}, err error)) (val interface{}, st State, err error) { - if st, ok, err := b.isReady(); !ok && err != nil { + if st, err := b.isReady(); err != nil { return nil, st, err } val, err = fn(ctx) @@ -98,20 +98,20 @@ func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val inte // isReady determines the breaker is ready or not. // If the current breaker state is "Closed" or "Half-Open", this function returns true. -func (b *breaker) isReady() (st State, ok bool, err error) { +func (b *breaker) isReady() (st State, err error) { st = b.currentState() switch st { case StateOpen: - return st, false, errors.ErrCircuitBreakerOpenState + return st, errors.ErrCircuitBreakerOpenState case StateHalfOpen: // For flow control in the "Half-Open" state. It is limited to 50%. total := b.count.Load().(*count).Total() if total != 0 && total%2 == 0 { - return st, false, errors.ErrCircuitBreakerHalfOpenFlowLimitation + return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation } } - return st, true, nil + return st, nil } func (b *breaker) success() { From 22099a0f98e1800be7e45c3ae71a536d26c114c7 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 16:33:21 +0900 Subject: [PATCH 3/8] add comment and generate test code Signed-off-by: hlts2 --- internal/circuitbreaker/breaker.go | 6 +++++- internal/circuitbreaker/breaker_test.go | 12 ++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index cd39024e8d..d900f326ec 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -106,6 +106,7 @@ func (b *breaker) isReady() (st State, err error) { case StateHalfOpen: // For flow control in the "Half-Open" state. It is limited to 50%. + // If this modulo is used, 1/2 of the requests will be error. total := b.count.Load().(*count).Total() if total != 0 && total%2 == 0 { return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation @@ -117,7 +118,10 @@ func (b *breaker) isReady() (st State, err error) { func (b *breaker) success() { cnt := b.count.Load().(*count) cnt.onSuccess() - if st := b.currentState(); st == StateHalfOpen && cnt.Total() > b.minSamples && !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { + + // halfOpenErrShouldTrip.ShouldTrip is true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate. + // In other words, if the error rate is less than the b.halfOpenErrRate, it can be judged that the success rate is high, so this function change to the "Close" state from "Half-Open". + if st := b.currentState(); st == StateHalfOpen && cnt.Total() >= b.minSamples && !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String()) b.reset() } diff --git a/internal/circuitbreaker/breaker_test.go b/internal/circuitbreaker/breaker_test.go index 4e5b8d6189..d6e9639fdb 100644 --- a/internal/circuitbreaker/breaker_test.go +++ b/internal/circuitbreaker/breaker_test.go @@ -286,27 +286,23 @@ func Test_breaker_isReady(t *testing.T) { } type want struct { wantSt State - wantOk bool err error } type test struct { name string fields fields want want - checkFunc func(want, State, bool, error) error + checkFunc func(want, State, error) error beforeFunc func(*testing.T) afterFunc func(*testing.T) } - defaultCheckFunc := func(w want, gotSt State, gotOk bool, err error) error { + defaultCheckFunc := func(w want, gotSt State, err error) error { if !errors.Is(err, w.err) { return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) } if !reflect.DeepEqual(gotSt, w.wantSt) { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotSt, w.wantSt) } - if !reflect.DeepEqual(gotOk, w.wantOk) { - return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotOk, w.wantOk) - } return nil } tests := []test{ @@ -401,8 +397,8 @@ func Test_breaker_isReady(t *testing.T) { closedRefreshExp: test.fields.closedRefreshExp, } - gotSt, gotOk, err := b.isReady() - if err := checkFunc(test.want, gotSt, gotOk, err); err != nil { + gotSt, err := b.isReady() + if err := checkFunc(test.want, gotSt, err); err != nil { tt.Errorf("error = %v", err) } }) From d1939084837ccd4173dd8fa1ad3786249d1485af Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 12 Oct 2022 16:34:59 +0900 Subject: [PATCH 4/8] Update internal/circuitbreaker/breaker.go --- internal/circuitbreaker/breaker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index d900f326ec..3a7fa09b5e 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -119,7 +119,7 @@ func (b *breaker) success() { cnt := b.count.Load().(*count) cnt.onSuccess() - // halfOpenErrShouldTrip.ShouldTrip is true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate. + // halfOpenErrShouldTrip.ShouldTrip returns true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate. // In other words, if the error rate is less than the b.halfOpenErrRate, it can be judged that the success rate is high, so this function change to the "Close" state from "Half-Open". if st := b.currentState(); st == StateHalfOpen && cnt.Total() >= b.minSamples && !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String()) From 44061c736d27a2b804a5763768fca3869e9287aa Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 17:11:20 +0900 Subject: [PATCH 5/8] add isReady function teset Signed-off-by: hlts2 --- internal/circuitbreaker/breaker.go | 8 +- internal/circuitbreaker/breaker_test.go | 137 ++++++++++++++---------- 2 files changed, 83 insertions(+), 62 deletions(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index 3a7fa09b5e..e3223a6d8a 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -106,9 +106,11 @@ func (b *breaker) isReady() (st State, err error) { case StateHalfOpen: // For flow control in the "Half-Open" state. It is limited to 50%. - // If this modulo is used, 1/2 of the requests will be error. - total := b.count.Load().(*count).Total() - if total != 0 && total%2 == 0 { + // If this modulo is used, 1/2 of the requests will be error. And if an error occurs, mark as failures. + cnt := b.count.Load().(*count) + total := cnt.Total() + if total%2 == 0 { + cnt.onFail() return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation } } diff --git a/internal/circuitbreaker/breaker_test.go b/internal/circuitbreaker/breaker_test.go index d6e9639fdb..95c6ae05ef 100644 --- a/internal/circuitbreaker/breaker_test.go +++ b/internal/circuitbreaker/breaker_test.go @@ -15,6 +15,7 @@ package circuitbreaker import ( "context" + "fmt" "reflect" "sync/atomic" "testing" @@ -306,65 +307,83 @@ func Test_breaker_isReady(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - beforeFunc: func(t *testing.T,) { - t.Helper() - }, - afterFunc: func(t *testing.T,) { - t.Helper() - }, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - beforeFunc: func(t *testing.T,) { - t.Helper() - }, - afterFunc: func(t *testing.T,) { - t.Helper() - }, - } - }(), - */ + func() test { + return test{ + name: "return the StateClose and nil when the current state is Close", + fields: fields{ + key: "insertRPC", + tripped: 0, + closedRefreshExp: time.Now().Add(100 * time.Second).UnixNano(), + }, + want: want{ + wantSt: StateClosed, + err: nil, + }, + checkFunc: defaultCheckFunc, + } + }(), + func() test { + var atCount atomic.Value + atCount.Store(&count{ + successes: 1, + }) + return test{ + name: "return the StateHalfOpen and nil when the current state is HalfOpen", + fields: fields{ + key: "insertRPC", + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + count: atCount, + }, + want: want{ + wantSt: StateHalfOpen, + err: nil, + }, + checkFunc: defaultCheckFunc, + } + }(), + func() test { + var atCount atomic.Value + atCount.Store(&count{}) + return test{ + name: "return the StateHalfOpen and error when the current state is HalfOpen but the flow is being limited", + fields: fields{ + key: "insertRPC", + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + count: atCount, + }, + want: want{ + wantSt: StateHalfOpen, + err: errors.ErrCircuitBreakerHalfOpenFlowLimitation, + }, + checkFunc: func(w want, s State, err error) error { + if err := defaultCheckFunc(w, s, err); err != nil { + return err + } + cnt := atCount.Load().(*count) + if got := cnt.Fails(); got != 1 { + return fmt.Errorf("failures is not equals. want: %d, but got: %d", 2, got) + } + return nil + }, + } + }(), + func() test { + return test{ + name: "return the StateOpen and error when the current state is Open", + fields: fields{ + key: "insertRPC", + tripped: 1, + openExp: time.Now().Add(100 * time.Second).UnixNano(), + }, + want: want{ + wantSt: StateOpen, + err: errors.ErrCircuitBreakerOpenState, + }, + checkFunc: defaultCheckFunc, + } + }(), } for _, tc := range tests { From c9e39431d404eab539522154b3f2f70d43f886d0 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 17:38:27 +0900 Subject: [PATCH 6/8] add success function test Signed-off-by: hlts2 --- internal/circuitbreaker/breaker_test.go | 123 ++++++++++++------------ 1 file changed, 60 insertions(+), 63 deletions(-) diff --git a/internal/circuitbreaker/breaker_test.go b/internal/circuitbreaker/breaker_test.go index 95c6ae05ef..a659e6efa5 100644 --- a/internal/circuitbreaker/breaker_test.go +++ b/internal/circuitbreaker/breaker_test.go @@ -446,71 +446,68 @@ func Test_breaker_success(t *testing.T) { want want checkFunc func(want) error beforeFunc func(*testing.T) - afterFunc func(*testing.T) + afterFunc func(*testing.T, *breaker) } defaultCheckFunc := func(w want) error { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - beforeFunc: func(t *testing.T,) { - t.Helper() - }, - afterFunc: func(t *testing.T,) { - t.Helper() - }, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - beforeFunc: func(t *testing.T,) { - t.Helper() - }, - afterFunc: func(t *testing.T,) { - t.Helper() - }, - } - }(), - */ + func() test { + var atCount atomic.Value + atCount.Store(&count{ + successes: 10, + failures: 1, + }) + halfOpenErrRate := float32(0.5) + minSamples := int64(10) + return test{ + name: "the current state change from HalfOpen to Close when the success rate is higher", + fields: fields{ + key: "insertRPC", + count: atCount, + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + halfOpenErrRate: halfOpenErrRate, + halfOpenErrShouldTrip: NewRateTripper(halfOpenErrRate, minSamples), + minSamples: minSamples, + }, + checkFunc: defaultCheckFunc, + afterFunc: func(t *testing.T, b *breaker) { + t.Helper() + if b.tripped != 0 { + t.Errorf("state did not change: %d", b.tripped) + } + }, + } + }(), + func() test { + var atCount atomic.Value + atCount.Store(&count{ + successes: 1, + failures: 10, + }) + halfOpenErrRate := float32(0.5) + minSamples := int64(10) + return test{ + name: "the current state do not change from HalfOpen to Close when the success rate is less", + fields: fields{ + key: "insertRPC", + count: atCount, + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + halfOpenErrRate: halfOpenErrRate, + halfOpenErrShouldTrip: NewRateTripper(halfOpenErrRate, minSamples), + minSamples: minSamples, + }, + checkFunc: defaultCheckFunc, + afterFunc: func(t *testing.T, b *breaker) { + t.Helper() + if b.tripped != 1 { + t.Errorf("state changed: %d", b.tripped) + } + }, + } + }(), } for _, tc := range tests { @@ -521,9 +518,6 @@ func Test_breaker_success(t *testing.T) { if test.beforeFunc != nil { test.beforeFunc(tt) } - if test.afterFunc != nil { - defer test.afterFunc(tt) - } checkFunc := test.checkFunc if test.checkFunc == nil { checkFunc = defaultCheckFunc @@ -542,6 +536,9 @@ func Test_breaker_success(t *testing.T) { cloedRefreshTimeout: test.fields.cloedRefreshTimeout, closedRefreshExp: test.fields.closedRefreshExp, } + if test.afterFunc != nil { + defer test.afterFunc(tt, b) + } b.success() if err := checkFunc(test.want); err != nil { From 0351ac7dd8a4b2ef106c7f5fcbf7258705a89a72 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 17:56:34 +0900 Subject: [PATCH 7/8] fix count value Signed-off-by: hlts2 --- internal/circuitbreaker/breaker_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/circuitbreaker/breaker_test.go b/internal/circuitbreaker/breaker_test.go index a659e6efa5..0348d2637d 100644 --- a/internal/circuitbreaker/breaker_test.go +++ b/internal/circuitbreaker/breaker_test.go @@ -456,7 +456,7 @@ func Test_breaker_success(t *testing.T) { var atCount atomic.Value atCount.Store(&count{ successes: 10, - failures: 1, + failures: 10, }) halfOpenErrRate := float32(0.5) minSamples := int64(10) @@ -483,8 +483,8 @@ func Test_breaker_success(t *testing.T) { func() test { var atCount atomic.Value atCount.Store(&count{ - successes: 1, - failures: 10, + successes: 10, + failures: 11, }) halfOpenErrRate := float32(0.5) minSamples := int64(10) From 2dc089ff48de30aab71cc89a462468b519a14fc0 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 12 Oct 2022 18:02:17 +0900 Subject: [PATCH 8/8] refactor Signed-off-by: hlts2 --- internal/circuitbreaker/breaker.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index e3223a6d8a..d38dc47b8c 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -108,8 +108,7 @@ func (b *breaker) isReady() (st State, err error) { // For flow control in the "Half-Open" state. It is limited to 50%. // If this modulo is used, 1/2 of the requests will be error. And if an error occurs, mark as failures. cnt := b.count.Load().(*count) - total := cnt.Total() - if total%2 == 0 { + if cnt.Total()%2 == 0 { cnt.onFail() return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation } @@ -123,7 +122,9 @@ func (b *breaker) success() { // halfOpenErrShouldTrip.ShouldTrip returns true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate. // In other words, if the error rate is less than the b.halfOpenErrRate, it can be judged that the success rate is high, so this function change to the "Close" state from "Half-Open". - if st := b.currentState(); st == StateHalfOpen && cnt.Total() >= b.minSamples && !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { + if st := b.currentState(); st == StateHalfOpen && + cnt.Total() >= b.minSamples && + !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String()) b.reset() }