From 667b2f4f8e83f5ef617e6e320aad6a4b00a80d4a Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Fri, 14 Oct 2022 18:14:23 +0900 Subject: [PATCH] bugfix success handling in the half-open and add flow control (#1805) * bugfix success handling in the half-open and add flow control Signed-off-by: hlts2 * deleted boolean type of isReady function Signed-off-by: hlts2 * add comment and generate test code Signed-off-by: hlts2 * Update internal/circuitbreaker/breaker.go * add isReady function teset Signed-off-by: hlts2 * add success function test Signed-off-by: hlts2 * fix count value Signed-off-by: hlts2 * refactor Signed-off-by: hlts2 Signed-off-by: hlts2 --- internal/circuitbreaker/breaker.go | 33 +- internal/circuitbreaker/breaker_test.go | 406 ++++++++++++++++-------- internal/circuitbreaker/counter.go | 5 + internal/errors/circuitbreaker.go | 4 +- 4 files changed, 300 insertions(+), 148 deletions(-) diff --git a/internal/circuitbreaker/breaker.go b/internal/circuitbreaker/breaker.go index a0d6ca8030..d38dc47b8c 100644 --- a/internal/circuitbreaker/breaker.go +++ b/internal/circuitbreaker/breaker.go @@ -68,8 +68,8 @@ func newBreaker(key string, opts ...BreakerOption) (*breaker, error) { // do executes the function given argument when the current breaker state is "Closed" or "Half-Open". // If the current breaker state is "Open", this function returns ErrCircuitBreakerOpenState. func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val interface{}, err error)) (val interface{}, st State, err error) { - if !b.isReady() { - return nil, StateOpen, errors.ErrCircuitBreakerOpenState + if st, err := b.isReady(); err != nil { + return nil, st, err } val, err = fn(ctx) if err != nil { @@ -98,14 +98,33 @@ func (b *breaker) do(ctx context.Context, fn func(ctx context.Context) (val inte // isReady determines the breaker is ready or not. // If the current breaker state is "Closed" or "Half-Open", this function returns true. -func (b *breaker) isReady() (ok bool) { - st := b.currentState() - return st == StateClosed || st == StateHalfOpen +func (b *breaker) isReady() (st State, err error) { + st = b.currentState() + switch st { + case StateOpen: + return st, errors.ErrCircuitBreakerOpenState + case StateHalfOpen: + + // For flow control in the "Half-Open" state. It is limited to 50%. + // If this modulo is used, 1/2 of the requests will be error. And if an error occurs, mark as failures. + cnt := b.count.Load().(*count) + if cnt.Total()%2 == 0 { + cnt.onFail() + return st, errors.ErrCircuitBreakerHalfOpenFlowLimitation + } + } + return st, nil } func (b *breaker) success() { - b.count.Load().(*count).onSuccess() - if st := b.currentState(); st == StateHalfOpen { + cnt := b.count.Load().(*count) + cnt.onSuccess() + + // halfOpenErrShouldTrip.ShouldTrip returns true when the sum of the number of successes and failures is greater than the b.minSamples and when the error rate is greater than the b.halfOpenErrRate. + // In other words, if the error rate is less than the b.halfOpenErrRate, it can be judged that the success rate is high, so this function change to the "Close" state from "Half-Open". + if st := b.currentState(); st == StateHalfOpen && + cnt.Total() >= b.minSamples && + !b.halfOpenErrShouldTrip.ShouldTrip(cnt) { log.Infof("the operation succeeded, circuit breaker state for '%s' changed,\tfrom: %s, to: %s", b.key, st.String(), StateClosed.String()) b.reset() } diff --git a/internal/circuitbreaker/breaker_test.go b/internal/circuitbreaker/breaker_test.go index 7a6f8c5bb7..0348d2637d 100644 --- a/internal/circuitbreaker/breaker_test.go +++ b/internal/circuitbreaker/breaker_test.go @@ -15,6 +15,7 @@ package circuitbreaker import ( "context" + "fmt" "reflect" "sync/atomic" "testing" @@ -38,8 +39,8 @@ func Test_newBreaker(t *testing.T) { args args want want checkFunc func(want, *breaker, error) error - beforeFunc func(args) - afterFunc func(args) + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) } defaultCheckFunc := func(w want, got *breaker, err error) error { if !errors.Is(err, w.err) { @@ -61,6 +62,12 @@ func Test_newBreaker(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, }, */ @@ -75,6 +82,12 @@ func Test_newBreaker(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, } }(), */ @@ -86,10 +99,10 @@ func Test_newBreaker(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc(test.args) + test.beforeFunc(tt, test.args) } if test.afterFunc != nil { - defer test.afterFunc(test.args) + defer test.afterFunc(tt, test.args) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -134,8 +147,8 @@ func Test_breaker_do(t *testing.T) { fields fields want want checkFunc func(want, interface{}, State, error) error - beforeFunc func(args) - afterFunc func(args) + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) } defaultCheckFunc := func(w want, gotVal interface{}, gotSt State, err error) error { if !errors.Is(err, w.err) { @@ -174,6 +187,12 @@ func Test_breaker_do(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, }, */ @@ -202,6 +221,12 @@ func Test_breaker_do(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, } }(), */ @@ -213,10 +238,10 @@ func Test_breaker_do(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc(test.args) + test.beforeFunc(tt, test.args) } if test.afterFunc != nil { - defer test.afterFunc(test.args) + defer test.afterFunc(tt, test.args) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -261,70 +286,104 @@ func Test_breaker_isReady(t *testing.T) { closedRefreshExp int64 } type want struct { - wantOk bool + wantSt State + err error } type test struct { name string fields fields want want - checkFunc func(want, bool) error - beforeFunc func() - afterFunc func() + checkFunc func(want, State, error) error + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } - defaultCheckFunc := func(w want, gotOk bool) error { - if !reflect.DeepEqual(gotOk, w.wantOk) { - return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotOk, w.wantOk) + defaultCheckFunc := func(w want, gotSt State, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(gotSt, w.wantSt) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotSt, w.wantSt) } return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + func() test { + return test{ + name: "return the StateClose and nil when the current state is Close", + fields: fields{ + key: "insertRPC", + tripped: 0, + closedRefreshExp: time.Now().Add(100 * time.Second).UnixNano(), + }, + want: want{ + wantSt: StateClosed, + err: nil, + }, + checkFunc: defaultCheckFunc, + } + }(), + func() test { + var atCount atomic.Value + atCount.Store(&count{ + successes: 1, + }) + return test{ + name: "return the StateHalfOpen and nil when the current state is HalfOpen", + fields: fields{ + key: "insertRPC", + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + count: atCount, + }, + want: want{ + wantSt: StateHalfOpen, + err: nil, + }, + checkFunc: defaultCheckFunc, + } + }(), + func() test { + var atCount atomic.Value + atCount.Store(&count{}) + return test{ + name: "return the StateHalfOpen and error when the current state is HalfOpen but the flow is being limited", + fields: fields{ + key: "insertRPC", + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + count: atCount, + }, + want: want{ + wantSt: StateHalfOpen, + err: errors.ErrCircuitBreakerHalfOpenFlowLimitation, + }, + checkFunc: func(w want, s State, err error) error { + if err := defaultCheckFunc(w, s, err); err != nil { + return err + } + cnt := atCount.Load().(*count) + if got := cnt.Fails(); got != 1 { + return fmt.Errorf("failures is not equals. want: %d, but got: %d", 2, got) + } + return nil + }, + } + }(), + func() test { + return test{ + name: "return the StateOpen and error when the current state is Open", + fields: fields{ + key: "insertRPC", + tripped: 1, + openExp: time.Now().Add(100 * time.Second).UnixNano(), + }, + want: want{ + wantSt: StateOpen, + err: errors.ErrCircuitBreakerOpenState, + }, + checkFunc: defaultCheckFunc, + } + }(), } for _, tc := range tests { @@ -333,10 +392,10 @@ func Test_breaker_isReady(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -357,8 +416,8 @@ func Test_breaker_isReady(t *testing.T) { closedRefreshExp: test.fields.closedRefreshExp, } - gotOk := b.isReady() - if err := checkFunc(test.want, gotOk); err != nil { + gotSt, err := b.isReady() + if err := checkFunc(test.want, gotSt, err); err != nil { tt.Errorf("error = %v", err) } }) @@ -386,60 +445,69 @@ func Test_breaker_success(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T, *breaker) } defaultCheckFunc := func(w want) error { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - key: "", - count: nil, - tripped: 0, - closedErrRate: 0, - closedErrShouldTrip: nil, - halfOpenErrRate: 0, - halfOpenErrShouldTrip: nil, - minSamples: 0, - openTimeout: nil, - openExp: 0, - cloedRefreshTimeout: nil, - closedRefreshExp: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + func() test { + var atCount atomic.Value + atCount.Store(&count{ + successes: 10, + failures: 10, + }) + halfOpenErrRate := float32(0.5) + minSamples := int64(10) + return test{ + name: "the current state change from HalfOpen to Close when the success rate is higher", + fields: fields{ + key: "insertRPC", + count: atCount, + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + halfOpenErrRate: halfOpenErrRate, + halfOpenErrShouldTrip: NewRateTripper(halfOpenErrRate, minSamples), + minSamples: minSamples, + }, + checkFunc: defaultCheckFunc, + afterFunc: func(t *testing.T, b *breaker) { + t.Helper() + if b.tripped != 0 { + t.Errorf("state did not change: %d", b.tripped) + } + }, + } + }(), + func() test { + var atCount atomic.Value + atCount.Store(&count{ + successes: 10, + failures: 11, + }) + halfOpenErrRate := float32(0.5) + minSamples := int64(10) + return test{ + name: "the current state do not change from HalfOpen to Close when the success rate is less", + fields: fields{ + key: "insertRPC", + count: atCount, + tripped: 1, + openExp: time.Now().Add(-100 * time.Second).UnixNano(), + halfOpenErrRate: halfOpenErrRate, + halfOpenErrShouldTrip: NewRateTripper(halfOpenErrRate, minSamples), + minSamples: minSamples, + }, + checkFunc: defaultCheckFunc, + afterFunc: func(t *testing.T, b *breaker) { + t.Helper() + if b.tripped != 1 { + t.Errorf("state changed: %d", b.tripped) + } + }, + } + }(), } for _, tc := range tests { @@ -448,10 +516,7 @@ func Test_breaker_success(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() - } - if test.afterFunc != nil { - defer test.afterFunc() + test.beforeFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -471,6 +536,9 @@ func Test_breaker_success(t *testing.T) { cloedRefreshTimeout: test.fields.cloedRefreshTimeout, closedRefreshExp: test.fields.closedRefreshExp, } + if test.afterFunc != nil { + defer test.afterFunc(tt, b) + } b.success() if err := checkFunc(test.want); err != nil { @@ -501,8 +569,8 @@ func Test_breaker_fail(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -528,6 +596,12 @@ func Test_breaker_fail(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -552,6 +626,12 @@ func Test_breaker_fail(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -563,10 +643,10 @@ func Test_breaker_fail(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -618,8 +698,8 @@ func Test_breaker_currentState(t *testing.T) { fields fields want want checkFunc func(want, State) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want, got State) error { if !reflect.DeepEqual(got, w.want) { @@ -648,6 +728,12 @@ func Test_breaker_currentState(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -672,6 +758,12 @@ func Test_breaker_currentState(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -683,10 +775,10 @@ func Test_breaker_currentState(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -736,8 +828,8 @@ func Test_breaker_reset(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -763,6 +855,12 @@ func Test_breaker_reset(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -787,6 +885,12 @@ func Test_breaker_reset(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -798,10 +902,10 @@ func Test_breaker_reset(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -851,8 +955,8 @@ func Test_breaker_trip(t *testing.T) { fields fields want want checkFunc func(want) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want) error { return nil @@ -878,6 +982,12 @@ func Test_breaker_trip(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -902,6 +1012,12 @@ func Test_breaker_trip(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -913,10 +1029,10 @@ func Test_breaker_trip(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { @@ -968,8 +1084,8 @@ func Test_breaker_isTripped(t *testing.T) { fields fields want want checkFunc func(want, bool) error - beforeFunc func() - afterFunc func() + beforeFunc func(*testing.T) + afterFunc func(*testing.T) } defaultCheckFunc := func(w want, gotOk bool) error { if !reflect.DeepEqual(gotOk, w.wantOk) { @@ -998,6 +1114,12 @@ func Test_breaker_isTripped(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, }, */ @@ -1022,6 +1144,12 @@ func Test_breaker_isTripped(t *testing.T) { }, want: want{}, checkFunc: defaultCheckFunc, + beforeFunc: func(t *testing.T,) { + t.Helper() + }, + afterFunc: func(t *testing.T,) { + t.Helper() + }, } }(), */ @@ -1033,10 +1161,10 @@ func Test_breaker_isTripped(t *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(tt) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(tt) } checkFunc := test.checkFunc if test.checkFunc == nil { diff --git a/internal/circuitbreaker/counter.go b/internal/circuitbreaker/counter.go index 5ebea3ae3e..bc20e906c7 100644 --- a/internal/circuitbreaker/counter.go +++ b/internal/circuitbreaker/counter.go @@ -16,6 +16,7 @@ package circuitbreaker import "sync/atomic" type Counter interface { + Total() int64 Successes() int64 Fails() int64 } @@ -33,6 +34,10 @@ func (c *count) Fails() (n int64) { return atomic.LoadInt64(&c.failures) } +func (c *count) Total() (n int64) { + return c.Successes() + c.Fails() +} + func (c *count) onSuccess() { atomic.AddInt64(&c.successes, 1) } diff --git a/internal/errors/circuitbreaker.go b/internal/errors/circuitbreaker.go index ac0ca1e5f3..014f0459e0 100644 --- a/internal/errors/circuitbreaker.go +++ b/internal/errors/circuitbreaker.go @@ -18,8 +18,8 @@ import ( ) var ( - // ErrCircuitBreakerTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests. - ErrCircuitBreakerTooManyRequests = errors.New("too many requests") + // ErrCircuitBreakerHalfOpenFlowLimitation is returned in case of flow limitation in half-open state. + ErrCircuitBreakerHalfOpenFlowLimitation = errors.New("circuitbreaker breaker half-open flow limitation") // ErrCircuitBreakerOpenState is returned when the CB state is open. ErrCircuitBreakerOpenState = errors.New("circuit breaker is open") )