Skip to content

Commit

Permalink
add dispatcher log and pool pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
hb-chen committed Jul 26, 2019
1 parent d06d538 commit e1748b3
Show file tree
Hide file tree
Showing 18 changed files with 825 additions and 5 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
# pkg
go common package

- Feature
- dispatcher
- log
- pool
- rate
2 changes: 2 additions & 0 deletions dispatcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Dispatch Goroutine
ref: github.com/istio/istio/mixer/pkg/runtime/dispatcher
17 changes: 17 additions & 0 deletions dispatcher/dispatch_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dispatcher

type dispatchState struct {
session *session
handler DispatchHandler
err error
}

func (ds *dispatchState) clear() {
ds.session = nil
ds.err = nil
}

func (ds *dispatchState) invokeHandler(p interface{}) {
ds.err = ds.handler(p)
ds.session.completed <- ds
}
62 changes: 62 additions & 0 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package dispatcher

import (
"sync"

"github.com/hb-go/pkg/pool"
)

type DispatchHandler func(interface{}) error

type Dispatcher struct {
// pool of sessions
sessionPool sync.Pool

// pool of dispatch states
statePool sync.Pool

// pool of goroutines
gp *pool.GoroutinePool
}

func NewDispatcher(handlerGP *pool.GoroutinePool) *Dispatcher {
d := &Dispatcher{
gp: handlerGP,
}

d.sessionPool.New = func() interface{} { return &session{} }
d.statePool.New = func() interface{} { return &dispatchState{} }
return d
}

func (d *Dispatcher) Dispatch(handlers ...DispatchHandler) error {
s := d.getSession()

s.handlers = handlers
err := s.dispatch()

d.putSession(s)
return err
}

func (d *Dispatcher) getSession() *session {
s := d.sessionPool.Get().(*session)
s.dispatcher = d
return s
}

func (d *Dispatcher) putSession(s *session) {
s.clear()
d.sessionPool.Put(s)
}

func (d *Dispatcher) getDispatchState() *dispatchState {
ds := d.statePool.Get().(*dispatchState)

return ds
}

func (d *Dispatcher) putDispatchState(ds *dispatchState) {
ds.clear()
d.statePool.Put(ds)
}
78 changes: 78 additions & 0 deletions dispatcher/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package dispatcher

import (
"log"

"github.com/hashicorp/go-multierror"
)

const queueAllocSize = 64

type session struct {
dispatcher *Dispatcher
handlers []DispatchHandler

activeDispatches int
completed chan *dispatchState

err error
}

func (s *session) clear() {
s.dispatcher = nil
s.activeDispatches = 0
s.handlers = nil
s.err = nil

// Drain the channel
exit := false
for !exit {
select {
case <-s.completed:
log.Printf("Leaked dispatch state discovered!")
continue
default:
exit = true
}
}
}

func (s *session) ensureParallelism(minParallelism int) {
// Resize the channel to accommodate the parallelism, if necessary.
if cap(s.completed) < minParallelism {
allocSize := ((minParallelism / queueAllocSize) + 1) * queueAllocSize
s.completed = make(chan *dispatchState, allocSize)
}
}

func (s *session) dispatch() error {
s.ensureParallelism(len(s.handlers))

for _, h := range s.handlers {
ds := s.dispatcher.getDispatchState()
ds.handler = h
s.dispatchToHandler(ds)
}

s.waitForDispatched()
return s.err
}

func (s *session) dispatchToHandler(ds *dispatchState) {
s.activeDispatches++
ds.session = s
s.dispatcher.gp.ScheduleWork(ds.invokeHandler, nil)
}

func (s *session) waitForDispatched() {
for s.activeDispatches > 0 {
state := <-s.completed
s.activeDispatches--

if state.err != nil {
s.err = multierror.Append(s.err, state.err)
}

s.dispatcher.putDispatchState(state)
}
}
16 changes: 16 additions & 0 deletions dispatcher/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dispatcher

import (
"context"
"google.golang.org/grpc"
)

type (
NewClientFunc func(*grpc.ClientConn) interface{}
WorkerFunc func(interface{}) error
)

type Service struct {
Ctx context.Context
Call func(*grpc.ClientConn) interface{}
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ module github.com/hb-go/pkg

require (
github.com/bsm/redis-lock v8.0.0+incompatible
github.com/fatih/color v1.7.0
github.com/go-redis/redis v6.15.2+incompatible
github.com/go-redis/redis_rate v6.5.0+incompatible
github.com/go-redsync/redsync v1.1.1
github.com/go-redsync/redsync v1.2.0
github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/go-multierror v1.0.0
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/grpc v1.22.1
)
40 changes: 38 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,36 +1,71 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/bsm/redis-lock v8.0.0+incompatible h1:QgB0J2pNG8hUfndTIvpPh38F5XsUTTvO7x8Sls++9Mk=
github.com/bsm/redis-lock v8.0.0+incompatible/go.mod h1:8dGkQ5GimBCahwF2R67tqGCJbyDZSp0gzO7wq3pDrik=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis_rate v6.5.0+incompatible h1:K/G+KaoJgO3kbkLLbfdg0kzJsHhhk0gVGTMgstKgbsM=
github.com/go-redis/redis_rate v6.5.0+incompatible/go.mod h1:Jxe7BhQuVncH6fUQ2rwoAkc8SesjCGIWkm6fNRQo4Qg=
github.com/go-redsync/redsync v1.1.1 h1:26b9SCeW9yz2VaBP1qSpSNMnpSDBPOw21VYzNe+AHgI=
github.com/go-redsync/redsync v1.1.1/go.mod h1:QClK/s99KRhfKdpxLTMsI5mSu43iLp0NfOneLPie+78=
github.com/go-redsync/redsync v1.2.0 h1:a4y3xKQUOA5092Grjps3F5vaRbjA9uoUB59RVwOMttA=
github.com/go-redsync/redsync v1.2.0/go.mod h1:QClK/s99KRhfKdpxLTMsI5mSu43iLp0NfOneLPie+78=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM=
google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
Expand All @@ -39,3 +74,4 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
95 changes: 95 additions & 0 deletions log/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package log

import (
"fmt"
"log"
"os"
)

type defaultLogger struct {
*log.Logger
calldepth int
}

func NewLogger() *defaultLogger {
return &defaultLogger{
Logger: log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Llongfile),
calldepth: 3,
}
}

func (l *defaultLogger) SetCalldepth(calldepth int) {
l.calldepth = calldepth
}

func (l *defaultLogger) Debug(v ...interface{}) {
l.output(DEBUG, v...)
}

func (l *defaultLogger) Debugf(format string, v ...interface{}) {
l.outputf(DEBUG, format, v...)
}

func (l *defaultLogger) Info(v ...interface{}) {
l.output(INFO, v...)
}

func (l *defaultLogger) Infof(format string, v ...interface{}) {
l.outputf(INFO, format, v...)
}

func (l *defaultLogger) Warn(v ...interface{}) {
l.output(WARN, v...)
}

func (l *defaultLogger) Warnf(format string, v ...interface{}) {
l.outputf(WARN, format, v...)
}

func (l *defaultLogger) Error(v ...interface{}) {
l.output(ERROR, v...)
}

func (l *defaultLogger) Errorf(format string, v ...interface{}) {
l.outputf(ERROR, format, v...)
}

func (l *defaultLogger) Fatal(v ...interface{}) {
l.output(fatalLvl, v...)
os.Exit(1)
}

func (l *defaultLogger) Fatalf(format string, v ...interface{}) {
l.outputf(fatalLvl, format, v...)
os.Exit(1)
}

func (l *defaultLogger) Panic(v ...interface{}) {
s := fmt.Sprint(v...)
l.output(panicLvl, s)
panic(s)
}

func (l *defaultLogger) Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
l.output(panicLvl, s)
panic(s)
}

func (l *defaultLogger) output(lvl Lvl, v ...interface{}) {
if lvl < level {
return
}
l.Output(l.calldepth, header(lvl, fmt.Sprint(v...)))
}

func (l *defaultLogger) outputf(lvl Lvl, format string, v ...interface{}) {
if lvl < level {
return
}
l.Output(l.calldepth, header(lvl, fmt.Sprintf(format, v...)))
}

func header(lvl Lvl, msg string) string {
return fmt.Sprintf("[%s] %s", lvl.String(), msg)
}
Loading

0 comments on commit e1748b3

Please sign in to comment.