Skip to content

Commit

Permalink
use atomic.Bool for closed bool
Browse files Browse the repository at this point in the history
  • Loading branch information
sameh-farouk committed Nov 18, 2024
1 parent 3b6ec40 commit 2cfa156
Showing 1 changed file with 28 additions and 37 deletions.
65 changes: 28 additions & 37 deletions clients/tfchain-client-go/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,28 +160,19 @@ func (m *manager) initializePool() {
}
}

func (m *manager) createConnection(ctx context.Context, url string) (*poolConn, error) {
func (m *manager) createConnection(url string) (*poolConn, error) {
log.Debug().Str("url", url).Msg("attempting to create a new connection")
ctx, cancel := context.WithTimeout(ctx, m.config.ConnectionTimeout)
defer cancel()

select {
case <-ctx.Done():
log.Error().Str("url", url).Msg("context done while creating connection")
return nil, ctx.Err()
default:
if conn, meta, err := createSubstrateConn(url); err == nil {
log.Debug().Str("url", url).Msg("created new connection")
return &poolConn{
conn: conn,
meta: meta,
url: url,
lastUsed: atomic.Int64{},
inUse: atomic.Bool{},
}, nil
} else {
log.Error().Str("url", url).Err(err).Msg("failed to create connection")
}
if conn, meta, err := createSubstrateConn(url); err == nil {
log.Debug().Str("url", url).Msg("created new connection")
return &poolConn{
conn: conn,
meta: meta,
url: url,
lastUsed: atomic.Int64{},
inUse: atomic.Bool{},
}, nil
} else {
log.Error().Str("url", url).Err(err).Msg("failed to create connection")
}
return nil, fmt.Errorf("failed to create connection to %s", url)
}
Expand Down Expand Up @@ -239,11 +230,7 @@ func (m *manager) getHealthyConn() (*poolConn, error) {
return ErrNoConnectionsAvailable
}, b)

if err != nil {
return nil, err
}

return conn, nil
return conn, err
}

func (m *manager) healthChecker() {
Expand Down Expand Up @@ -303,7 +290,7 @@ func (m *manager) ensureMinConnections() {
poolSize := len(m.pool)

if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) {
if conn, err := m.createConnection(m.ctx, url); err == nil {
if conn, err := m.createConnection(url); err == nil {
m.mu.Lock()
m.pool = append(m.pool, conn)
m.mu.Unlock()
Expand Down Expand Up @@ -402,7 +389,7 @@ type Substrate struct {
conn *poolConn
mgr *manager
mu sync.Mutex
closed bool
closed atomic.Bool
}

func newSubstrate(conn *poolConn, mgr *manager) *Substrate {
Expand Down Expand Up @@ -434,18 +421,23 @@ func createSubstrateConn(url string) (Conn, Meta, error) {
}

func (s *Substrate) GetClient() (Conn, Meta, error) {
if s.closed {
if s.closed.Load() {
log.Error().Msg("attempted to get client from closed substrate")
return nil, nil, fmt.Errorf("substrate connection closed")
}

if s.conn.isHealthy() {
s.mu.Lock()
if s.conn != nil && s.conn.isHealthy() {
conn := s.conn.conn
meta := s.conn.meta
s.conn.lastUsed.Store(time.Now().Unix())
s.mu.Unlock()
return conn, meta, nil
}
s.conn.inUse.Store(false)
if s.conn != nil {
s.conn.inUse.Store(false)
}
s.mu.Unlock()

conn, err := s.mgr.getHealthyConn()
if err != nil {
Expand All @@ -454,22 +446,21 @@ func (s *Substrate) GetClient() (Conn, Meta, error) {
}

s.mu.Lock()
defer s.mu.Unlock()

s.conn = conn
s.mu.Unlock()

log.Debug().Str("url", conn.url).Msg("swapped connection")
return conn.conn, conn.meta, nil
}

func (s *Substrate) Release() {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
if !s.closed.CompareAndSwap(false, true) {
return
}
s.closed = true

s.mu.Lock()
defer s.mu.Unlock()

if s.conn != nil {
s.conn.inUse.Store(false)
Expand Down

0 comments on commit 2cfa156

Please sign in to comment.