Skip to content

Commit

Permalink
Do not create new addrConn when connection error happens (#1369)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jul 27, 2017
1 parent 284c78b commit 5ad7775
Showing 1 changed file with 60 additions and 32 deletions.
92 changes: 60 additions & 32 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,15 +559,16 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
//
// We should never need to replace an addrConn with a new one. This function is only used
// as newAddrConn to create new addrConn.
// TODO rename this function and clean up the code.
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = cc.mkp
cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
Expand Down Expand Up @@ -598,10 +599,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
// 1) a buggy Balancer notifies duplicated Addresses;
// 2) goaway was received, a new ac will replace the old ac.
// The old ac should be deleted from cc.conns, but the
// underlying transport should drain rather than close.
// a buggy Balancer that reports duplicated Addresses.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
Expand Down Expand Up @@ -828,26 +826,44 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
return ac.state, nil
}

func (ac *addrConn) resetTransport(closeTransport bool) error {
// resetTransport recreates a transport to the address for ac.
// For the old transport:
// - if drain is true, it will be gracefully closed.
// - otherwise, it will be closed.
func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
if ac.state == Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
if ac.down != nil {
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting
ac.stateCV.Broadcast()
t := ac.transport
ac.transport = nil
ac.mu.Unlock()
if t != nil {
if drain {
t.GracefulClose()
} else {
t.Close()
}
}
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
if ac.down != nil {
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting
ac.stateCV.Broadcast()
t := ac.transport
ac.mu.Unlock()
if closeTransport && t != nil {
t.Close()
}
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
Expand Down Expand Up @@ -883,7 +899,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.ready = nil
}
ac.mu.Unlock()
closeTransport = false
timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
case <-timer.C:
Expand Down Expand Up @@ -936,28 +951,40 @@ func (ac *addrConn) transportMonitor() {
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
// are closed.
// In both cases, a new ac is created.
// If GoAway happens without any network I/O error, the underlying transport
// will be gracefully closed, and a new transport will be created.
// (The transport will be closed when all the pending RPCs finished or failed.)
// If GoAway and some network I/O error happen concurrently, the underlying transport
// will be closed, and a new transport will be created.
var drain bool
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
drain = true
}
if err := ac.resetTransport(drain); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
return
case <-t.Error():
select {
case <-ac.ctx.Done():
t.Close()
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
default:
}
ac.mu.Lock()
Expand All @@ -969,7 +996,8 @@ func (ac *addrConn) transportMonitor() {
ac.state = TransientFailure
ac.stateCV.Broadcast()
ac.mu.Unlock()
if err := ac.resetTransport(true); err != nil {
if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
Expand Down

0 comments on commit 5ad7775

Please sign in to comment.