Skip to content

Commit

Permalink
[FAB-10197] Fix CachingConnector deadlock
Browse files Browse the repository at this point in the history
Change-Id: If9f406c6ab02b01df193151882971f2d8a1a0f5f
Signed-off-by: Divyank Katira <[email protected]>
  • Loading branch information
d1vyank committed May 18, 2018
1 parent e3515e5 commit f3fee0e
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 158 deletions.
4 changes: 2 additions & 2 deletions pkg/fab/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestMain(m *testing.M) {

go grpcServer.Serve(lis)

srvs, addrs, err := startEndorsers(2, endorserAddress)
srvs, addrs, err := startEndorsers(30, endorserAddress)
if err != nil {
panic(fmt.Sprintf("Error starting endorser %s", err))
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func startEndorserServer(grpcServer *grpc.Server, address string) (*mocks.MockEn

endorserServer := &mocks.MockEndorserServer{}
pb.RegisterEndorserServer(grpcServer, endorserServer)
fmt.Printf("Starting test server on %s", addr)
fmt.Printf("Starting test server on %s\n", addr)
go grpcServer.Serve(lis)
return endorserServer, addr, true
}
223 changes: 88 additions & 135 deletions pkg/fab/comm/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ const (
//
// This component has been designed to be safe for concurrency.
type CachingConnector struct {
conns sync.Map
sweepTime time.Duration
idleTime time.Duration
index map[*grpc.ClientConn]*cachedConn
lock sync.Mutex
conns map[string]*cachedConn
sweepTime time.Duration
idleTime time.Duration
index map[*grpc.ClientConn]*cachedConn
// lock protects concurrent access to the connection cache
// it is held during create, load, release, and sweep connection
// operations. Note: it is released during openConn, which is
// the blocking part of the connection process.
lock sync.RWMutex
waitgroup sync.WaitGroup
janitorChan chan *cachedConn
janitorDone chan bool
janitorClosed chan bool
}
Expand All @@ -45,18 +48,16 @@ type cachedConn struct {
target string
conn *grpc.ClientConn
open int
lastOpen time.Time
lastClose time.Time
}

// NewCachingConnector creates a GRPC connection cache. The cache is governed by
// sweepTime and idleTime.
func NewCachingConnector(sweepTime time.Duration, idleTime time.Duration) *CachingConnector {
cc := CachingConnector{
conns: sync.Map{},
conns: map[string]*cachedConn{},
index: map[*grpc.ClientConn]*cachedConn{},
janitorChan: make(chan *cachedConn),
janitorDone: make(chan bool),
janitorDone: make(chan bool, 1),
janitorClosed: make(chan bool, 1),
sweepTime: sweepTime,
idleTime: idleTime,
Expand All @@ -73,15 +74,15 @@ func NewCachingConnector(sweepTime time.Duration, idleTime time.Duration) *Cachi

// Close cleans up cached connections.
func (cc *CachingConnector) Close() {
cc.lock.Lock()
defer cc.lock.Unlock()

cc.lock.RLock()
// Safety check to see if the connector has been closed. This represents a
// bug in the calling code, but it's not good to panic here.
if cc.janitorDone == nil {
cc.lock.RUnlock()
logger.Warn("Trying to close connector after already closed")
return
}
cc.lock.RUnlock()
logger.Debug("closing caching GRPC connector")

select {
Expand All @@ -93,7 +94,16 @@ func (cc *CachingConnector) Close() {
cc.waitgroup.Wait()
}

close(cc.janitorChan)
cc.lock.Lock()
defer cc.lock.Unlock()

if len(cc.index) > 0 {
logger.Debugf("flushing connection cache with open connections [%d]", len(cc.index))
} else {
logger.Debugf("flushing connection cache")
}

cc.flush()
close(cc.janitorClosed)
close(cc.janitorDone)
cc.janitorDone = nil
Expand All @@ -103,16 +113,23 @@ func (cc *CachingConnector) Close() {
func (cc *CachingConnector) DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
logger.Debugf("DialContext: %s", target)

cc.lock.Lock()
c, ok := cc.loadConn(target)
if !ok {
createdConn, err := cc.createConn(ctx, target, opts...)
if err != nil {
cc.lock.Unlock()
return nil, errors.WithMessage(err, "connection creation failed")
}
c = createdConn
}

cc.lock.Unlock()

if err := cc.openConn(ctx, c); err != nil {
cc.lock.Lock()
setClosed(c)
cc.lock.Unlock()
return nil, errors.Errorf("dialing connection timed out [%s]", target)
}
return c.conn, nil
Expand Down Expand Up @@ -144,33 +161,27 @@ func (cc *CachingConnector) ReleaseConn(conn *grpc.ClientConn) {
}
logger.Debugf("ReleaseConn [%s]", cconn.target)

if cconn.open > 0 {
cconn.lastClose = time.Now()
cconn.open--
}
setClosed(cconn)

cc.updateJanitor(cconn)
cc.ensureJanitorStarted()
}

func (cc *CachingConnector) loadConn(target string) (*cachedConn, bool) {
connRaw, ok := cc.conns.Load(target)
c, ok := cc.conns[target]
if ok {
c, ok := connRaw.(*cachedConn)
if ok {
if c.conn.GetState() != connectivity.Shutdown {
logger.Debugf("using cached connection [%s: %p]", target, c)
return c, true
}
cc.shutdownConn(c)
if c.conn.GetState() != connectivity.Shutdown {
logger.Debugf("using cached connection [%s: %p]", target, c)
// Set connection open as soon as it is loaded to prevent the janitor
// from sweeping it
c.open++
return c, true
}
cc.shutdownConn(c)
}
return nil, false
}

func (cc *CachingConnector) createConn(ctx context.Context, target string, opts ...grpc.DialOption) (*cachedConn, error) {
cc.lock.Lock()
defer cc.lock.Unlock()

if cc.janitorDone == nil {
return nil, errors.New("caching connector is closed")
}
Expand All @@ -190,8 +201,10 @@ func (cc *CachingConnector) createConn(ctx context.Context, target string, opts
cconn = &cachedConn{
target: target,
conn: conn,
open: 1,
}
cc.conns.Store(target, cconn)

cc.conns[target] = cconn
cc.index[conn] = cconn

return cconn, nil
Expand All @@ -204,11 +217,7 @@ func (cc *CachingConnector) openConn(ctx context.Context, c *cachedConn) error {
return err
}

cc.lock.Lock()
defer cc.lock.Unlock()
c.open++
c.lastOpen = time.Now()
cc.updateJanitor(c)
cc.ensureJanitorStarted()

logger.Debugf("connection was opened [%s]", c.target)
return nil
Expand All @@ -228,157 +237,94 @@ func waitConn(ctx context.Context, conn *grpc.ClientConn, targetState connectivi
}

func (cc *CachingConnector) shutdownConn(cconn *cachedConn) {
cc.lock.Lock()
defer cc.lock.Unlock()

if cc.janitorDone == nil {
logger.Debug("Connector already closed")
return
}

logger.Debugf("connection was shutdown [%s]", cconn.target)
cc.conns.Delete(cconn.target)
delete(cc.conns, cconn.target)
delete(cc.index, cconn.conn)

cconn.open = 0
cconn.lastClose = time.Time{}

cc.updateJanitor(cconn)
cc.ensureJanitorStarted()
}

func (cc *CachingConnector) removeConn(target string) {
cc.lock.Lock()
defer cc.lock.Unlock()

logger.Debugf("removing connection [%s]", target)
connRaw, ok := cc.conns.Load(target)
if ok {
c, ok := connRaw.(*cachedConn)
if ok {
delete(cc.index, c.conn)
cc.conns.Delete(target)
if err := c.conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}
func (cc *CachingConnector) sweepAndRemove() {
now := time.Now()
for conn, cachedConn := range cc.index {
if cachedConn.open == 0 && now.After(cachedConn.lastClose.Add(cc.idleTime)) {
logger.Debugf("connection janitor closing connection [%s]", cachedConn.target)
cc.removeConn(cachedConn)
} else if conn.GetState() == connectivity.Shutdown {
logger.Debugf("connection already closed [%s]", cachedConn.target)
cc.removeConn(cachedConn)
}
}
}

func (cc *CachingConnector) updateJanitor(c *cachedConn) {
func (cc *CachingConnector) removeConn(c *cachedConn) {
logger.Debugf("removing connection [%s]", c.target)
delete(cc.index, c.conn)
delete(cc.conns, c.target)
if err := c.conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}
}

func (cc *CachingConnector) ensureJanitorStarted() {
select {
case <-cc.janitorClosed:
logger.Debugf("janitor not started")
cc.waitgroup.Add(1)
go janitor(cc.sweepTime, cc.idleTime, &cc.waitgroup, cc.janitorChan, cc.janitorClosed, cc.janitorDone, cc.removeConn)
go cc.janitor()
default:
logger.Debugf("janitor already started")
}
cClone := *c

cc.janitorChan <- &cClone
}

// The janitor monitors open connections for shutdown state or extended non-usage.
// janitor monitors open connections for shutdown state or extended non-usage.
// This component operates by running a sweep with a period determined by "sweepTime".
// When a connection returned the GRPC status connectivity.Shutdown or when the connection
// has its usages closed for longer than "idleTime", the connection is closed and the
// "connRemove" notifier is called.
//
// The caching connector:
// pushes connection information via the "conn" go channel.
// notifies the janitor of close by closing the "done" go channel.
//
// The janitor:
// calls "connRemove" callback when closing a connection.
// decrements the "wg" waitgroup when exiting.
// writes to the "done" go channel when closing due to becoming empty.

type connRemoveNotifier func(target string)

func janitor(sweepTime time.Duration, idleTime time.Duration, wg *sync.WaitGroup, conn chan *cachedConn, close chan bool, done chan bool, connRemove connRemoveNotifier) {
func (cc *CachingConnector) janitor() {
logger.Debugf("starting connection janitor")
defer wg.Done()
defer cc.waitgroup.Done()

conns := map[string]*cachedConn{}
ticker := time.NewTicker(sweepTime)
ticker := time.NewTicker(cc.sweepTime)
for {
select {
case <-done:
if len(conns) > 0 {
logger.Debugf("flushing connection janitor with open connections [%d]", len(conns))
} else {
logger.Debugf("flushing connection janitor")
}
flush(conns)
case <-cc.janitorDone:
return
case c := <-conn:
cache(conns, c)
case <-ticker.C:
rm := sweep(conns, idleTime)
for _, target := range rm {
connRemove(target)
delete(conns, target)
}

if len(conns) == 0 {
cc.lock.Lock()
cc.sweepAndRemove()
numConn := len(cc.index)
cc.lock.Unlock()
if numConn == 0 {
logger.Debugf("closing connection janitor")
close <- true
cc.janitorClosed <- true
return
}
}
}
}

func cache(conns map[string]*cachedConn, updateConn *cachedConn) {

c, ok := conns[updateConn.target]
if ok && updateConn.lastClose.IsZero() && updateConn.conn.GetState() == connectivity.Shutdown {
logger.Debugf("connection shutdown detected in connection janitor")
// We need to remove the connection from sweep consideration immediately
// since the connector has already removed it. Otherwise we can have a race
// between shutdown and creating a connection concurrently.
delete(conns, updateConn.target)
return
}

if !ok {
logger.Debugf("new connection in connection janitor")
} else if c.conn != updateConn.conn {
logger.Debugf("connection change in connection janitor")

if err := c.conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
}

} else {
logger.Debugf("updating existing connection in connection janitor")
}

conns[updateConn.target] = updateConn
}

func flush(conns map[string]*cachedConn) {
for _, c := range conns {
logger.Debugf("connection janitor closing connection [%s]", c.target)
func (cc *CachingConnector) flush() {
for _, c := range cc.index {
logger.Debugf("flushing connection [%s]", c.target)
closeConn(c.conn)
}
}

func sweep(conns map[string]*cachedConn, idleTime time.Duration) []string {
rm := make([]string, 0, len(conns))
now := time.Now()
for _, c := range conns {
if c.open == 0 && now.After(c.lastClose.Add(idleTime)) {
logger.Debugf("connection janitor closing connection [%s]", c.target)
rm = append(rm, c.target)
} else if c.conn.GetState() == connectivity.Shutdown {
logger.Debugf("connection already closed [%s]", c.target)
rm = append(rm, c.target)
}
}
return rm
}

func closeConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
logger.Debugf("unable to close connection [%s]", err)
Expand All @@ -390,3 +336,10 @@ func closeConn(conn *grpc.ClientConn) {
}
cancel()
}

func setClosed(cconn *cachedConn) {
if cconn.open > 0 {
cconn.lastClose = time.Now()
cconn.open--
}
}
Loading

0 comments on commit f3fee0e

Please sign in to comment.