Skip to content

Commit

Permalink
Better connection pool handling (#2725)
Browse files Browse the repository at this point in the history
* [fix] etcd config source prefix issue (#2389)

* http transport data race issue (#2436)

* [fix] #2431 http transport data race issue

* [feature] Ability to close connection while receiving.
Ability to send messages while receiving.
Icreased r channel limit to 100 to more fluently communication.
Do not dropp sent request if r channel is full.

* [fix] Use pool connection close timeout

* [fix] replace Close with private function

* [fix] Do not close the transport client twice in stream connection , the transport client is closed in the rpc codec

* [fix] tests

---------

Co-authored-by: Johnson C <[email protected]>
  • Loading branch information
Ak-Army and xpunch authored Jul 23, 2024
1 parent 1c6c1ff commit 0433e98
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 43 deletions.
29 changes: 20 additions & 9 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
DefaultPoolSize = 100
// DefaultPoolTTL sets the connection pool ttl.
DefaultPoolTTL = time.Minute
// DefaultPoolCloseTimeout sets the connection pool colse timeout.
DefaultPoolCloseTimeout = time.Second
)

// Options are the Client options.
Expand Down Expand Up @@ -63,8 +65,9 @@ type Options struct {
Wrappers []Wrapper

// Connection Pool
PoolSize int
PoolTTL time.Duration
PoolSize int
PoolTTL time.Duration
PoolCloseTimeout time.Duration
}

// CallOptions are options used to make calls to a server.
Expand Down Expand Up @@ -140,13 +143,14 @@ func NewOptions(options ...Option) Options {
ConnectionTimeout: DefaultConnectionTimeout,
DialTimeout: transport.DefaultDialTimeout,
},
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
Broker: broker.DefaultBroker,
Selector: selector.DefaultSelector,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
Logger: logger.DefaultLogger,
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
PoolCloseTimeout: DefaultPoolCloseTimeout,
Broker: broker.DefaultBroker,
Selector: selector.DefaultSelector,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
Logger: logger.DefaultLogger,
}

for _, o := range options {
Expand Down Expand Up @@ -191,6 +195,13 @@ func PoolTTL(d time.Duration) Option {
}
}

// PoolCloseTimeout sets the connection pool close timeout.
func PoolCloseTimeout(d time.Duration) Option {
return func(o *Options) {
o.PoolCloseTimeout = d
}
}

// Registry to find nodes for a given service.
func Registry(r registry.Registry) Option {
return func(o *Options) {
Expand Down
6 changes: 5 additions & 1 deletion client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func newRPCClient(opt ...Option) Client {
pool.Size(opts.PoolSize),
pool.TTL(opts.PoolTTL),
pool.Transport(opts.Transport),
pool.CloseTimeout(opts.PoolCloseTimeout),
)

rc := &rpcClient{
Expand Down Expand Up @@ -148,7 +149,10 @@ func (r *rpcClient) call(

c, err := r.pool.Get(address, dOpts...)
if err != nil {
return merrors.InternalServerError("go.micro.client", "connection error: %v", err)
if c == nil {
return merrors.InternalServerError("go.micro.client", "connection error: %v", err)
}
logger.Log(log.ErrorLevel, "failed to close pool", err)
}

seq := atomic.AddUint64(&r.seq, 1) - 1
Expand Down
8 changes: 8 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,14 @@ func (c *cmd) Before(ctx *cli.Context) error {
clientOpts = append(clientOpts, client.PoolTTL(d))
}

if t := ctx.String("client_pool_close_timeout"); len(t) > 0 {
d, err := time.ParseDuration(t)
if err != nil {
return fmt.Errorf("failed to parse client_pool_close_timeout: %v", t)
}
clientOpts = append(clientOpts, client.PoolCloseTimeout(d))
}

// We have some command line opts for the server.
// Lets set it up
if len(serverOpts) > 0 {
Expand Down
84 changes: 57 additions & 27 deletions util/pool/default.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pool

import (
"errors"
"sync"
"time"

Expand All @@ -12,37 +13,40 @@ import (
type pool struct {
tr transport.Transport

conns map[string][]*poolConn
size int
ttl time.Duration

sync.Mutex
closeTimeout time.Duration
conns map[string][]*poolConn
mu sync.Mutex
size int
ttl time.Duration
}

type poolConn struct {
created time.Time
transport.Client
id string

closeTimeout time.Duration
created time.Time
id string
}

func newPool(options Options) *pool {
return &pool{
size: options.Size,
tr: options.Transport,
ttl: options.TTL,
conns: make(map[string][]*poolConn),
size: options.Size,
tr: options.Transport,
ttl: options.TTL,
closeTimeout: options.CloseTimeout,
conns: make(map[string][]*poolConn),
}
}

func (p *pool) Close() error {
p.Lock()
defer p.Unlock()
p.mu.Lock()
defer p.mu.Unlock()

var err error

for k, c := range p.conns {
for _, conn := range c {
if nerr := conn.Client.Close(); nerr != nil {
if nerr := conn.close(); nerr != nil {
err = nerr
}
}
Expand All @@ -67,7 +71,7 @@ func (p *poolConn) Created() time.Time {
}

func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {
p.Lock()
p.mu.Lock()
conns := p.conns[addr]

// While we have conns check age and then return one
Expand All @@ -79,51 +83,77 @@ func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {

// If conn is old kill it and move on
if d := time.Since(conn.Created()); d > p.ttl {
if err := conn.Client.Close(); err != nil {
p.Unlock()
return nil, err
if err := conn.close(); err != nil {
p.mu.Unlock()
c, errConn := p.newConn(addr, opts)
if errConn != nil {
return nil, errConn
}
return c, err
}

continue
}

// We got a good conn, lets unlock and return it
p.Unlock()
p.mu.Unlock()

return conn, nil
}

p.Unlock()
p.mu.Unlock()

return p.newConn(addr, opts)
}

func (p *pool) newConn(addr string, opts []transport.DialOption) (Conn, error) {
// create new conn
c, err := p.tr.Dial(addr, opts...)
if err != nil {
return nil, err
}

return &poolConn{
Client: c,
id: uuid.New().String(),
created: time.Now(),
Client: c,
id: uuid.New().String(),
closeTimeout: p.closeTimeout,
created: time.Now(),
}, nil
}

func (p *pool) Release(conn Conn, err error) error {
// don't store the conn if it has errored
if err != nil {
return conn.(*poolConn).Client.Close()
return conn.(*poolConn).close()
}

// otherwise put it back for reuse
p.Lock()
defer p.Unlock()
p.mu.Lock()
defer p.mu.Unlock()

conns := p.conns[conn.Remote()]
if len(conns) >= p.size {
return conn.(*poolConn).Client.Close()
return conn.(*poolConn).close()
}

p.conns[conn.Remote()] = append(conns, conn.(*poolConn))

return nil
}

func (p *poolConn) close() error {
ch := make(chan error)
go func() {
defer close(ch)
ch <- p.Client.Close()
}()
t := time.NewTimer(p.closeTimeout)
var err error
select {
case <-t.C:
err = errors.New("unable to close in time")
case err = <-ch:
t.Stop()
}
return err
}
6 changes: 3 additions & 3 deletions util/pool/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
// release the conn
p.Release(c, nil)

p.Lock()
p.mu.Lock()
if i := len(p.conns[l.Addr()]); i > size {
p.Unlock()
p.mu.Unlock()
t.Fatalf("pool size %d is greater than expected %d", i, size)
}
p.Unlock()
p.mu.Unlock()
}
}

Expand Down
13 changes: 10 additions & 3 deletions util/pool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type Options struct {
Transport transport.Transport
TTL time.Duration
Size int
Transport transport.Transport
TTL time.Duration
CloseTimeout time.Duration
Size int
}

type Option func(*Options)
Expand All @@ -31,3 +32,9 @@ func TTL(t time.Duration) Option {
o.TTL = t
}
}

func CloseTimeout(t time.Duration) Option {
return func(o *Options) {
o.CloseTimeout = t
}
}

0 comments on commit 0433e98

Please sign in to comment.