From e1748b361233dfa970e62f6d296baff0ab00f849 Mon Sep 17 00:00:00 2001 From: Hobo86 Date: Sat, 27 Jul 2019 05:23:50 +0800 Subject: [PATCH] add dispatcher log and pool pkg --- README.md | 6 ++ dispatcher/README.md | 2 + dispatcher/dispatch_state.go | 17 +++++ dispatcher/dispatcher.go | 62 +++++++++++++++++ dispatcher/session.go | 78 ++++++++++++++++++++++ dispatcher/type.go | 16 +++++ go.mod | 6 +- go.sum | 40 ++++++++++- log/default.go | 95 ++++++++++++++++++++++++++ log/logger.go | 125 +++++++++++++++++++++++++++++++++++ pool/README.md | 2 + pool/buffer.go | 36 ++++++++++ pool/buffer_test.go | 37 +++++++++++ pool/goroutine.go | 109 ++++++++++++++++++++++++++++++ pool/goroutine_test.go | 56 ++++++++++++++++ pool/intern.go | 87 ++++++++++++++++++++++++ pool/intern_test.go | 51 ++++++++++++++ rate/rate.go | 5 +- 18 files changed, 825 insertions(+), 5 deletions(-) create mode 100644 dispatcher/README.md create mode 100644 dispatcher/dispatch_state.go create mode 100644 dispatcher/dispatcher.go create mode 100644 dispatcher/session.go create mode 100644 dispatcher/type.go create mode 100644 log/default.go create mode 100644 log/logger.go create mode 100644 pool/README.md create mode 100644 pool/buffer.go create mode 100644 pool/buffer_test.go create mode 100644 pool/goroutine.go create mode 100644 pool/goroutine_test.go create mode 100644 pool/intern.go create mode 100644 pool/intern_test.go diff --git a/README.md b/README.md index e97e336..7347583 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,8 @@ # pkg go common package + +- Feature + - dispatcher + - log + - pool + - rate \ No newline at end of file diff --git a/dispatcher/README.md b/dispatcher/README.md new file mode 100644 index 0000000..c89351d --- /dev/null +++ b/dispatcher/README.md @@ -0,0 +1,2 @@ +# Dispatch Goroutine +ref: github.com/istio/istio/mixer/pkg/runtime/dispatcher \ No newline at end of file diff --git a/dispatcher/dispatch_state.go b/dispatcher/dispatch_state.go new file mode 100644 index 0000000..5d52873 --- /dev/null +++ b/dispatcher/dispatch_state.go @@ -0,0 +1,17 @@ +package dispatcher + +type dispatchState struct { + session *session + handler DispatchHandler + err error +} + +func (ds *dispatchState) clear() { + ds.session = nil + ds.err = nil +} + +func (ds *dispatchState) invokeHandler(p interface{}) { + ds.err = ds.handler(p) + ds.session.completed <- ds +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go new file mode 100644 index 0000000..5f65edf --- /dev/null +++ b/dispatcher/dispatcher.go @@ -0,0 +1,62 @@ +package dispatcher + +import ( + "sync" + + "github.com/hb-go/pkg/pool" +) + +type DispatchHandler func(interface{}) error + +type Dispatcher struct { + // pool of sessions + sessionPool sync.Pool + + // pool of dispatch states + statePool sync.Pool + + // pool of goroutines + gp *pool.GoroutinePool +} + +func NewDispatcher(handlerGP *pool.GoroutinePool) *Dispatcher { + d := &Dispatcher{ + gp: handlerGP, + } + + d.sessionPool.New = func() interface{} { return &session{} } + d.statePool.New = func() interface{} { return &dispatchState{} } + return d +} + +func (d *Dispatcher) Dispatch(handlers ...DispatchHandler) error { + s := d.getSession() + + s.handlers = handlers + err := s.dispatch() + + d.putSession(s) + return err +} + +func (d *Dispatcher) getSession() *session { + s := d.sessionPool.Get().(*session) + s.dispatcher = d + return s +} + +func (d *Dispatcher) putSession(s *session) { + s.clear() + d.sessionPool.Put(s) +} + +func (d *Dispatcher) getDispatchState() *dispatchState { + ds := d.statePool.Get().(*dispatchState) + + return ds +} + +func (d *Dispatcher) putDispatchState(ds *dispatchState) { + ds.clear() + d.statePool.Put(ds) +} diff --git a/dispatcher/session.go b/dispatcher/session.go new file mode 100644 index 0000000..d75fcf9 --- /dev/null +++ b/dispatcher/session.go @@ -0,0 +1,78 @@ +package dispatcher + +import ( + "log" + + "github.com/hashicorp/go-multierror" +) + +const queueAllocSize = 64 + +type session struct { + dispatcher *Dispatcher + handlers []DispatchHandler + + activeDispatches int + completed chan *dispatchState + + err error +} + +func (s *session) clear() { + s.dispatcher = nil + s.activeDispatches = 0 + s.handlers = nil + s.err = nil + + // Drain the channel + exit := false + for !exit { + select { + case <-s.completed: + log.Printf("Leaked dispatch state discovered!") + continue + default: + exit = true + } + } +} + +func (s *session) ensureParallelism(minParallelism int) { + // Resize the channel to accommodate the parallelism, if necessary. + if cap(s.completed) < minParallelism { + allocSize := ((minParallelism / queueAllocSize) + 1) * queueAllocSize + s.completed = make(chan *dispatchState, allocSize) + } +} + +func (s *session) dispatch() error { + s.ensureParallelism(len(s.handlers)) + + for _, h := range s.handlers { + ds := s.dispatcher.getDispatchState() + ds.handler = h + s.dispatchToHandler(ds) + } + + s.waitForDispatched() + return s.err +} + +func (s *session) dispatchToHandler(ds *dispatchState) { + s.activeDispatches++ + ds.session = s + s.dispatcher.gp.ScheduleWork(ds.invokeHandler, nil) +} + +func (s *session) waitForDispatched() { + for s.activeDispatches > 0 { + state := <-s.completed + s.activeDispatches-- + + if state.err != nil { + s.err = multierror.Append(s.err, state.err) + } + + s.dispatcher.putDispatchState(state) + } +} diff --git a/dispatcher/type.go b/dispatcher/type.go new file mode 100644 index 0000000..ca5761f --- /dev/null +++ b/dispatcher/type.go @@ -0,0 +1,16 @@ +package dispatcher + +import ( + "context" + "google.golang.org/grpc" +) + +type ( + NewClientFunc func(*grpc.ClientConn) interface{} + WorkerFunc func(interface{}) error +) + +type Service struct { + Ctx context.Context + Call func(*grpc.ClientConn) interface{} +} diff --git a/go.mod b/go.mod index 0a7fafe..a616595 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,15 @@ module github.com/hb-go/pkg require ( github.com/bsm/redis-lock v8.0.0+incompatible + github.com/fatih/color v1.7.0 github.com/go-redis/redis v6.15.2+incompatible github.com/go-redis/redis_rate v6.5.0+incompatible - github.com/go-redsync/redsync v1.1.1 + github.com/go-redsync/redsync v1.2.0 github.com/gomodule/redigo v2.0.0+incompatible + github.com/hashicorp/go-multierror v1.0.0 + github.com/mattn/go-colorable v0.1.2 // indirect github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 + google.golang.org/grpc v1.22.1 ) diff --git a/go.sum b/go.sum index 25bd79b..7e3aba6 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,36 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/bsm/redis-lock v8.0.0+incompatible h1:QgB0J2pNG8hUfndTIvpPh38F5XsUTTvO7x8Sls++9Mk= github.com/bsm/redis-lock v8.0.0+incompatible/go.mod h1:8dGkQ5GimBCahwF2R67tqGCJbyDZSp0gzO7wq3pDrik= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis_rate v6.5.0+incompatible h1:K/G+KaoJgO3kbkLLbfdg0kzJsHhhk0gVGTMgstKgbsM= github.com/go-redis/redis_rate v6.5.0+incompatible/go.mod h1:Jxe7BhQuVncH6fUQ2rwoAkc8SesjCGIWkm6fNRQo4Qg= -github.com/go-redsync/redsync v1.1.1 h1:26b9SCeW9yz2VaBP1qSpSNMnpSDBPOw21VYzNe+AHgI= -github.com/go-redsync/redsync v1.1.1/go.mod h1:QClK/s99KRhfKdpxLTMsI5mSu43iLp0NfOneLPie+78= +github.com/go-redsync/redsync v1.2.0 h1:a4y3xKQUOA5092Grjps3F5vaRbjA9uoUB59RVwOMttA= +github.com/go-redsync/redsync v1.2.0/go.mod h1:QClK/s99KRhfKdpxLTMsI5mSu43iLp0NfOneLPie+78= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -21,16 +38,34 @@ github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= +google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= @@ -39,3 +74,4 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/log/default.go b/log/default.go new file mode 100644 index 0000000..48ab29e --- /dev/null +++ b/log/default.go @@ -0,0 +1,95 @@ +package log + +import ( + "fmt" + "log" + "os" +) + +type defaultLogger struct { + *log.Logger + calldepth int +} + +func NewLogger() *defaultLogger { + return &defaultLogger{ + Logger: log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Llongfile), + calldepth: 3, + } +} + +func (l *defaultLogger) SetCalldepth(calldepth int) { + l.calldepth = calldepth +} + +func (l *defaultLogger) Debug(v ...interface{}) { + l.output(DEBUG, v...) +} + +func (l *defaultLogger) Debugf(format string, v ...interface{}) { + l.outputf(DEBUG, format, v...) +} + +func (l *defaultLogger) Info(v ...interface{}) { + l.output(INFO, v...) +} + +func (l *defaultLogger) Infof(format string, v ...interface{}) { + l.outputf(INFO, format, v...) +} + +func (l *defaultLogger) Warn(v ...interface{}) { + l.output(WARN, v...) +} + +func (l *defaultLogger) Warnf(format string, v ...interface{}) { + l.outputf(WARN, format, v...) +} + +func (l *defaultLogger) Error(v ...interface{}) { + l.output(ERROR, v...) +} + +func (l *defaultLogger) Errorf(format string, v ...interface{}) { + l.outputf(ERROR, format, v...) +} + +func (l *defaultLogger) Fatal(v ...interface{}) { + l.output(fatalLvl, v...) + os.Exit(1) +} + +func (l *defaultLogger) Fatalf(format string, v ...interface{}) { + l.outputf(fatalLvl, format, v...) + os.Exit(1) +} + +func (l *defaultLogger) Panic(v ...interface{}) { + s := fmt.Sprint(v...) + l.output(panicLvl, s) + panic(s) +} + +func (l *defaultLogger) Panicf(format string, v ...interface{}) { + s := fmt.Sprintf(format, v...) + l.output(panicLvl, s) + panic(s) +} + +func (l *defaultLogger) output(lvl Lvl, v ...interface{}) { + if lvl < level { + return + } + l.Output(l.calldepth, header(lvl, fmt.Sprint(v...))) +} + +func (l *defaultLogger) outputf(lvl Lvl, format string, v ...interface{}) { + if lvl < level { + return + } + l.Output(l.calldepth, header(lvl, fmt.Sprintf(format, v...))) +} + +func header(lvl Lvl, msg string) string { + return fmt.Sprintf("[%s] %s", lvl.String(), msg) +} diff --git a/log/logger.go b/log/logger.go new file mode 100644 index 0000000..e460def --- /dev/null +++ b/log/logger.go @@ -0,0 +1,125 @@ +package log + +import ( + "io" + + "github.com/fatih/color" +) + +const ( + DEBUG Lvl = iota + INFO + WARN + ERROR + OFF + fatalLvl + panicLvl +) + +var ( + level = DEBUG + colorEnable = true + global = NewLogger() +) + +func init() { + global.SetCalldepth(4) +} + +type ( + Lvl uint + colorFunc func(format string, a ...interface{}) string +) + +func (lvl Lvl) String() string { + switch lvl { + case DEBUG: + return lvl.colorString("DEBUG", color.WhiteString) + case INFO: + return lvl.colorString("INFO", color.GreenString) + case WARN: + return lvl.colorString("WARN", color.YellowString) + case ERROR: + return lvl.colorString("ERROR", color.RedString) + case fatalLvl: + return lvl.colorString("FATAL", color.HiRedString) + case panicLvl: + return lvl.colorString("PANIC", color.HiRedString) + default: + return lvl.colorString("-", color.WhiteString) + } +} + +func (lvl Lvl) colorString(str string, f colorFunc) string { + if colorEnable { + return f(str) + } else { + return str + } +} + +func SetLevel(lvl Lvl) { + level = lvl +} + +func SetColor(enable bool) { + colorEnable = enable +} + +func SetPrefix(prefix string) { + global.SetPrefix(prefix) +} + +func SetOutput(w io.Writer) { + global.SetOutput(w) +} + +func SetFlags(flag int) { + global.SetFlags(flag) +} + +func SetCalldepth(calldepth int) { + global.SetCalldepth(calldepth) +} + +func Debug(v ...interface{}) { + global.Debug(v...) +} +func Debugf(format string, v ...interface{}) { + global.Debugf(format, v...) +} + +func Info(v ...interface{}) { + global.Info(v...) +} +func Infof(format string, v ...interface{}) { + global.Infof(format, v...) +} + +func Warn(v ...interface{}) { + global.Warn(v...) +} +func Warnf(format string, v ...interface{}) { + global.Warnf(format, v...) +} + +func Error(v ...interface{}) { + global.Error(v...) +} +func Errorf(format string, v ...interface{}) { + global.Errorf(format, v...) +} + +func Fatal(v ...interface{}) { + global.Fatal(v...) +} +func Fatalf(format string, v ...interface{}) { + global.Fatalf(format, v...) +} + +func Panic(v ...interface{}) { + global.Panic(v...) +} +func Panicf(format string, v ...interface{}) { + global.Panicf(format, v...) +} diff --git a/pool/README.md b/pool/README.md new file mode 100644 index 0000000..b021815 --- /dev/null +++ b/pool/README.md @@ -0,0 +1,2 @@ +# Goroutine Pool +fork: from github.com/istio/istio/mixer/pkg/runtime/pool \ No newline at end of file diff --git a/pool/buffer.go b/pool/buffer.go new file mode 100644 index 0000000..4fc213b --- /dev/null +++ b/pool/buffer.go @@ -0,0 +1,36 @@ +// Copyright 2016 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package pool provides access to a mixer-global pool of buffers, a pool of goroutines, and +// a string interning table. +package pool + +import ( + "bytes" + "sync" +) + +var bufferPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} + +// GetBuffer returns a buffer from the buffer pool. +func GetBuffer() *bytes.Buffer { + return bufferPool.Get().(*bytes.Buffer) +} + +// PutBuffer returns a buffer to the buffer pool. You shouldn't reference this buffer +// after it has been returned to the pool, otherwise bad things will happen. +func PutBuffer(b *bytes.Buffer) { + b.Reset() + bufferPool.Put(b) +} diff --git a/pool/buffer_test.go b/pool/buffer_test.go new file mode 100644 index 0000000..ddeeb15 --- /dev/null +++ b/pool/buffer_test.go @@ -0,0 +1,37 @@ +// Copyright 2016 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "testing" +) + +func TestBuffer(t *testing.T) { + b1 := GetBuffer() + b2 := GetBuffer() + b3 := GetBuffer() + + if b1 == nil || b2 == nil || b3 == nil { + t.Errorf("One of the buffers is nil: %v %v %v", b1, b2, b3) + } + + if b1 == b2 || b1 == b3 || b2 == b3 { + t.Errorf("Some of the buffers are equivalent: %v %v %v", b1, b2, b3) + } + + PutBuffer(b1) + PutBuffer(b2) + PutBuffer(b3) +} diff --git a/pool/goroutine.go b/pool/goroutine.go new file mode 100644 index 0000000..01df23b --- /dev/null +++ b/pool/goroutine.go @@ -0,0 +1,109 @@ +// Copyright 2017 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "errors" + "sync" + "time" +) + +// WorkFunc represents a function to invoke from a worker. The parameter is passed on the side +// to avoid creating closures and allocating. +type WorkFunc func(param interface{}) + +// GoroutinePool represents a set of reusable goroutines onto which work can be scheduled. +type GoroutinePool struct { + count int64 + depth int64 + queue chan work // Channel providing the work that needs to be executed + wg sync.WaitGroup // Used to block shutdown until all workers complete + singleThreaded bool // Whether to actually use goroutines or not +} + +type work struct { + fn WorkFunc + param interface{} +} + +// NewGoroutinePool creates a new pool of goroutines to schedule async work. +func NewGoroutinePool(queueDepth int, singleThreaded bool) *GoroutinePool { + gp := &GoroutinePool{ + queue: make(chan work, queueDepth), + singleThreaded: singleThreaded, + } + + gp.AddWorkers(1) + return gp +} + +// Close waits for all goroutines to terminate (and implements io.Closer). +func (gp *GoroutinePool) Close() error { + if !gp.singleThreaded { + close(gp.queue) + gp.wg.Wait() + } + return nil +} + +// ScheduleWork registers the given function to be executed at some point. The given param will +// be supplied to the function during execution. +// +// By making use of the supplied parameter, it is possible to avoid allocation costs when scheduling the +// work function. The caller needs to make sure that function fn only depends on external data through the +// passed in param interface{} and nothing else. +// +// A way to ensure this condition is met, is by passing ScheduleWork a normal named function rather than an +// inline anonymous function. It's easy for code in an anonymous function to have (or gain) an accidental +// reference that causes a closure to silently be created. +func (gp *GoroutinePool) ScheduleWork(fn WorkFunc, param interface{}) { + if gp.singleThreaded { + fn(param) + } else { + gp.queue <- work{fn: fn, param: param} + } +} + +func (gp *GoroutinePool) ScheduleWorkWithTimeout(fn WorkFunc, param interface{}, d time.Duration) error { + if gp.singleThreaded { + fn(param) + } else { + tc := time.NewTimer(d) + select { + case <-tc.C: + return errors.New("timeout") + case gp.queue <- work{fn: fn, param: param}: + return nil + } + } + + return nil +} + +// AddWorkers introduces more goroutines in the worker pool, increasing potential parallelism. +func (gp *GoroutinePool) AddWorkers(numWorkers int) { + if !gp.singleThreaded { + gp.wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func() { + for work := range gp.queue { + work.fn(work.param) + } + + gp.wg.Done() + }() + } + } +} diff --git a/pool/goroutine_test.go b/pool/goroutine_test.go new file mode 100644 index 0000000..44b8181 --- /dev/null +++ b/pool/goroutine_test.go @@ -0,0 +1,56 @@ +// Copyright 2017 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "sync" + "testing" +) + +func TestWorkerPool(t *testing.T) { + const numWorkers = 123 + const numWorkItems = 456 + + parameterMismatch := false + + for i := 0; i < 2; i++ { + gp := NewGoroutinePool(128, i == 0) + gp.AddWorkers(numWorkers) + + wg := &sync.WaitGroup{} + wg.Add(numWorkItems) + + for i := 0; i < numWorkItems; i++ { + passedParam := i // capture the parameter on stack to avoid closing on the loop variable. + gp.ScheduleWork(func(param interface{}) { + paramI := param.(int) + if paramI != passedParam { + parameterMismatch = true + } + wg.Done() + }, passedParam) + } + + // wait for all the functions to have run + wg.Wait() + + if parameterMismatch { + t.Fatal("Passed parameter was not as expected") + } + + // make sure the pool can be shutdown cleanly + _ = gp.Close() + } +} diff --git a/pool/intern.go b/pool/intern.go new file mode 100644 index 0000000..c9aa272 --- /dev/null +++ b/pool/intern.go @@ -0,0 +1,87 @@ +// Copyright 2016 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "sync" +) + +// stringPool is a container of interned strings. +type stringPool struct { + sync.RWMutex + strings map[string]string + currentSize int + maxSize int +} + +const ( + // TODO: just a guess, tune with real-world use + averageStringLength = 10 + + // TODO: just a guess, tune with real-world use + maxSize = 16384 +) + +var globalStringPool = newStringPool(maxSize) + +// Intern returns a sharable version of the string, allowing the +// parameter's storage to be garbage collected. +func Intern(s string) string { + return globalStringPool.Intern(s) +} + +// newStringPool allocates a new interning pool ready for use. +// +// Go doesn't currently have sophisticated GC primitives, such as weak pointers. +// As a result, a simple string interning solution can easily become subject to +// memory bloating. Strings are only ever added and never removed, leading to +// an eventual OOM condition. Can easily be leveraged to DDoS a server for example. +// +// The code here uses a simple approach to work around this problem. If the table +// holding the interned strings ever holds more than maxSize's worth of strings, +// the table is completely dropped on the floor and a new table is allocated. This +// allows any stale strings pointed to by the old table to be reclaimed by the GC. +// This effectively puts a cap on the memory held by any single pool. The cost of +// this approach of course is that interning will be less efficient. +func newStringPool(maxSize int) *stringPool { + return &stringPool{strings: make(map[string]string, maxSize/averageStringLength), maxSize: maxSize} +} + +// Intern returns a sharable version of the string, allowing the +// parameter's storage to be garbage collected. +func (p *stringPool) Intern(s string) string { + // quick try if its already in the table + p.RLock() + result, found := p.strings[s] + p.RUnlock() + + if !found { + // look again under a serializing r/w lock + p.Lock() + if result, found = p.strings[s]; !found { + if len(s) > p.maxSize-p.currentSize { + p.strings = make(map[string]string, p.maxSize/averageStringLength) + p.currentSize = 0 + } + + p.strings[s] = s + p.currentSize += len(s) + result = s + } + p.Unlock() + } + + return result +} diff --git a/pool/intern_test.go b/pool/intern_test.go new file mode 100644 index 0000000..4a9f842 --- /dev/null +++ b/pool/intern_test.go @@ -0,0 +1,51 @@ +// Copyright 2016 Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "testing" +) + +func TestIntern(t *testing.T) { + strings := []string{ + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + } + + // We're only testing the semantics here, we're not actually + // verifying that interning has taken place. That's too hard to + // do in Go... + + p := newStringPool(4) + for _, s := range strings { + r := p.Intern(s) + if r != s { + t.Errorf("Got mismatch %v != %v", r, s) + } + } + + for _, s := range strings { + r := Intern(s) + if r != s { + t.Errorf("Got mismatch %v != %v", r, s) + } + } +} diff --git a/rate/rate.go b/rate/rate.go index eeadb81..ec9270b 100644 --- a/rate/rate.go +++ b/rate/rate.go @@ -2,8 +2,9 @@ package rate import ( "fmt" - "github.com/go-redis/redis" "time" + + "github.com/go-redis/redis" ) var ( @@ -21,7 +22,7 @@ type Limiter struct { opts *Options } -func NewLimiter(opt ... Option) *Limiter { +func NewLimiter(opt ...Option) *Limiter { opts := DefaultOptions() for _, o := range opt { o(opts)