From 2cfa156bca76738d7d05bd5eae0962b3d4d2fd24 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 04:20:21 +0200 Subject: [PATCH] use atomic.Bool for closed bool --- clients/tfchain-client-go/impl.go | 65 +++++++++++++------------------ 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/clients/tfchain-client-go/impl.go b/clients/tfchain-client-go/impl.go index ab610e199..369f5cee5 100644 --- a/clients/tfchain-client-go/impl.go +++ b/clients/tfchain-client-go/impl.go @@ -160,28 +160,19 @@ func (m *manager) initializePool() { } } -func (m *manager) createConnection(ctx context.Context, url string) (*poolConn, error) { +func (m *manager) createConnection(url string) (*poolConn, error) { log.Debug().Str("url", url).Msg("attempting to create a new connection") - ctx, cancel := context.WithTimeout(ctx, m.config.ConnectionTimeout) - defer cancel() - - select { - case <-ctx.Done(): - log.Error().Str("url", url).Msg("context done while creating connection") - return nil, ctx.Err() - default: - if conn, meta, err := createSubstrateConn(url); err == nil { - log.Debug().Str("url", url).Msg("created new connection") - return &poolConn{ - conn: conn, - meta: meta, - url: url, - lastUsed: atomic.Int64{}, - inUse: atomic.Bool{}, - }, nil - } else { - log.Error().Str("url", url).Err(err).Msg("failed to create connection") - } + if conn, meta, err := createSubstrateConn(url); err == nil { + log.Debug().Str("url", url).Msg("created new connection") + return &poolConn{ + conn: conn, + meta: meta, + url: url, + lastUsed: atomic.Int64{}, + inUse: atomic.Bool{}, + }, nil + } else { + log.Error().Str("url", url).Err(err).Msg("failed to create connection") } return nil, fmt.Errorf("failed to create connection to %s", url) } @@ -239,11 +230,7 @@ func (m *manager) getHealthyConn() (*poolConn, error) { return ErrNoConnectionsAvailable }, b) - if err != nil { - return nil, err - } - - return conn, nil + return conn, err } func (m *manager) healthChecker() { @@ -303,7 +290,7 @@ func (m *manager) ensureMinConnections() { poolSize := len(m.pool) if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) { - if conn, err := m.createConnection(m.ctx, url); err == nil { + if conn, err := m.createConnection(url); err == nil { m.mu.Lock() m.pool = append(m.pool, conn) m.mu.Unlock() @@ -402,7 +389,7 @@ type Substrate struct { conn *poolConn mgr *manager mu sync.Mutex - closed bool + closed atomic.Bool } func newSubstrate(conn *poolConn, mgr *manager) *Substrate { @@ -434,18 +421,23 @@ func createSubstrateConn(url string) (Conn, Meta, error) { } func (s *Substrate) GetClient() (Conn, Meta, error) { - if s.closed { + if s.closed.Load() { log.Error().Msg("attempted to get client from closed substrate") return nil, nil, fmt.Errorf("substrate connection closed") } - if s.conn.isHealthy() { + s.mu.Lock() + if s.conn != nil && s.conn.isHealthy() { conn := s.conn.conn meta := s.conn.meta s.conn.lastUsed.Store(time.Now().Unix()) + s.mu.Unlock() return conn, meta, nil } - s.conn.inUse.Store(false) + if s.conn != nil { + s.conn.inUse.Store(false) + } + s.mu.Unlock() conn, err := s.mgr.getHealthyConn() if err != nil { @@ -454,22 +446,21 @@ func (s *Substrate) GetClient() (Conn, Meta, error) { } s.mu.Lock() + defer s.mu.Unlock() s.conn = conn - s.mu.Unlock() log.Debug().Str("url", conn.url).Msg("swapped connection") return conn.conn, conn.meta, nil } func (s *Substrate) Release() { - s.mu.Lock() - defer s.mu.Unlock() - - if s.closed { + if !s.closed.CompareAndSwap(false, true) { return } - s.closed = true + + s.mu.Lock() + defer s.mu.Unlock() if s.conn != nil { s.conn.inUse.Store(false)