-
Notifications
You must be signed in to change notification settings - Fork 78
/
Copy pathmanager.go
89 lines (78 loc) · 2.11 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package circuitbreaker
import (
"context"
"reflect"
"sync"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
)
// NOTE: This variable is for observability package.
// This will be fixed when refactoring the observability package.
var (
mu sync.RWMutex
metrics = make(map[string]map[State]int64) // map[breaker_name]map[state]count
)
// CircuitBreaker is a state machine to prevent doing processes that are likely to fail.
type CircuitBreaker interface {
Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (val interface{}, err error)
}
type breakerManager struct {
m sync.Map // breaker group. key: string, value: *breaker.
opts []BreakerOption
}
// NewCircuitBreaker returns CircuitBreaker object if no error occurs.
func NewCircuitBreaker(opts ...Option) (CircuitBreaker, error) {
bm := &breakerManager{}
for _, opt := range append(defaultOpts, opts...) {
if err := opt(bm); err != nil {
oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt))
e := &errors.ErrCriticalOption{}
if errors.As(oerr, &e) {
log.Error(oerr)
return nil, oerr
}
log.Warn(oerr)
}
}
return bm, nil
}
// Do invokes the breaker matching the given key.
func (bm *breakerManager) Do(ctx context.Context, key string, fn func(ctx context.Context) (interface{}, error)) (val interface{}, err error) {
var st State
defer func() {
mu.Lock()
if _, ok := metrics[key]; !ok {
metrics[key] = map[State]int64{
st: 1,
}
} else {
metrics[key][st]++
}
mu.Unlock()
}()
// Pre-loading to prevent a lot of object generation.
obj, ok := bm.m.Load(key)
if ok {
val, st, err = obj.(*breaker).do(ctx, fn)
return val, err
}
b, err := newBreaker(key, bm.opts...)
if err != nil {
return nil, err
}
obj, _ = bm.m.LoadOrStore(key, b)
val, st, err = obj.(*breaker).do(ctx, fn)
return val, err
}
func Metrics(_ context.Context) map[string]map[State]int64 {
mu.RLock()
defer mu.RUnlock()
if len(metrics) == 0 {
return nil
}
m := make(map[string]map[State]int64, len(metrics))
for name, sts := range metrics {
m[name] = sts
}
return m
}