Skip to content

Commit

Permalink
bugfix: prevent segfaults in connection_pool
Browse files Browse the repository at this point in the history
After the patch ConnectionPool.getNextConnection() does not return
(nil, nil). It always return a connection or an error.

The check Connection.ConnectedNow() does not have sence because
the connection may be closed right after the call. The code just
complicates the logic and does not protect against anything.

A chain of two atomic operations IsEmpty() + GetNextConnection()
wrong because it leads too a race condition.

Part of #208
  • Loading branch information
oleg-jukovec committed Aug 24, 2022
1 parent 7645f40 commit 2fc7566
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Mode type description in the connection_pool subpackage (#208)
- Missed Role type constants in the connection_pool subpackage (#208)
- ConnectionPool does not close UnknownRole connections (#208)
- Segmentation faults in ConnectionPool requests after disconnect (#208)

## [1.8.0] - 2022-08-17

Expand Down
66 changes: 29 additions & 37 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,20 @@ func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
if connPool.getState() != connConnected {
return false, nil
}

conn, err := connPool.getNextConnection(mode)
if err != nil || conn == nil {
return false, err
switch mode {
case ANY:
return !connPool.anyPool.IsEmpty(), nil
case RW:
return !connPool.rwPool.IsEmpty(), nil
case RO:
return !connPool.roPool.IsEmpty(), nil
case PreferRW:
fallthrough
case PreferRO:
return !connPool.rwPool.IsEmpty() || !connPool.roPool.IsEmpty(), nil
default:
return false, ErrNoHealthyInstance
}

return conn.ConnectedNow(), nil
}

// ConfiguredTimeout gets timeout of current connection.
Expand Down Expand Up @@ -751,49 +758,34 @@ func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connect

switch mode {
case ANY:
if connPool.anyPool.IsEmpty() {
return nil, ErrNoHealthyInstance
if next := connPool.anyPool.GetNextConnection(); next != nil {
return next, nil
}

return connPool.anyPool.GetNextConnection(), nil

case RW:
if connPool.rwPool.IsEmpty() {
return nil, ErrNoRwInstance
if next := connPool.rwPool.GetNextConnection(); next != nil {
return next, nil
}

return connPool.rwPool.GetNextConnection(), nil

return nil, ErrNoRwInstance
case RO:
if connPool.roPool.IsEmpty() {
return nil, ErrNoRoInstance
if next := connPool.roPool.GetNextConnection(); next != nil {
return next, nil
}

return connPool.roPool.GetNextConnection(), nil

return nil, ErrNoRoInstance
case PreferRW:
if !connPool.rwPool.IsEmpty() {
return connPool.rwPool.GetNextConnection(), nil
if next := connPool.rwPool.GetNextConnection(); next != nil {
return next, nil
}

if !connPool.roPool.IsEmpty() {
return connPool.roPool.GetNextConnection(), nil
if next := connPool.roPool.GetNextConnection(); next != nil {
return next, nil
}

return nil, ErrNoHealthyInstance

case PreferRO:
if !connPool.roPool.IsEmpty() {
return connPool.roPool.GetNextConnection(), nil
if next := connPool.roPool.GetNextConnection(); next != nil {
return next, nil
}

if !connPool.rwPool.IsEmpty() {
return connPool.rwPool.GetNextConnection(), nil
if next := connPool.rwPool.GetNextConnection(); next != nil {
return next, nil
}

return nil, ErrNoHealthyInstance
}

return nil, ErrNoHealthyInstance
}

Expand Down
36 changes: 36 additions & 0 deletions connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,42 @@ func TestClose(t *testing.T) {
require.Nil(t, err)
}

func TestRequestOnClosed(t *testing.T) {
server1 := servers[0]
server2 := servers[1]

connPool, err := connection_pool.Connect([]string{server1, server2}, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

test_helpers.StopTarantoolWithCleanup(instances[0])
test_helpers.StopTarantoolWithCleanup(instances[1])

args := test_helpers.CheckStatusesArgs{
ConnPool: connPool,
Mode: connection_pool.ANY,
Servers: []string{server1, server2},
ExpectedPoolStatus: false,
ExpectedStatuses: map[string]bool{
server1: false,
server2: false,
},
}
err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry)
require.Nil(t, err)

_, err = connPool.Ping(connection_pool.ANY)
require.NotNilf(t, err, "err is nil after Ping")

err = test_helpers.RestartTarantool(&instances[0])
require.Nilf(t, err, "failed to restart tarantool")

err = test_helpers.RestartTarantool(&instances[1])
require.Nilf(t, err, "failed to restart tarantool")
}

func TestCall17(t *testing.T) {
roles := []bool{false, true, false, false, true}

Expand Down
34 changes: 12 additions & 22 deletions connection_pool/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package connection_pool

import (
"sync"
"sync/atomic"

"github.com/tarantool/go-tarantool"
)

type RoundRobinStrategy struct {
conns []*tarantool.Connection
indexByAddr map[string]int
indexByAddr map[string]uint
mutex sync.RWMutex
size int
current uint64
size uint
current uint
}

func (r *RoundRobinStrategy) GetConnByAddr(addr string) *tarantool.Connection {
Expand Down Expand Up @@ -66,29 +65,18 @@ func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection {
r.mutex.RLock()
defer r.mutex.RUnlock()

// We want to iterate through the elements in a circular order
// so the first element in cycle is connections[next]
// and the last one is connections[next + length].
next := r.nextIndex()
cycleLen := len(r.conns) + next
for i := next; i < cycleLen; i++ {
idx := i % len(r.conns)
if r.conns[idx].ConnectedNow() {
if i != next {
atomic.StoreUint64(&r.current, uint64(idx))
}
return r.conns[idx]
}
if r.size == 0 {
return nil
}

return nil
return r.conns[r.nextIndex()]
}

func NewEmptyRoundRobin(size int) *RoundRobinStrategy {
return &RoundRobinStrategy{
conns: make([]*tarantool.Connection, 0, size),
indexByAddr: make(map[string]int),
indexByAddr: make(map[string]uint),
size: 0,
current: 0,
}
}

Expand All @@ -105,6 +93,8 @@ func (r *RoundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) {
}
}

func (r *RoundRobinStrategy) nextIndex() int {
return int(atomic.AddUint64(&r.current, uint64(1)) % uint64(len(r.conns)))
func (r *RoundRobinStrategy) nextIndex() uint {
ret := r.current % r.size
r.current++
return ret
}
16 changes: 16 additions & 0 deletions connection_pool/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,20 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) {
}
}

func TestRoundRobinGetNextConnection(t *testing.T) {
rr := NewEmptyRoundRobin(10)

addrs := []string{validAddr1, validAddr2}
conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}}

for i, addr := range addrs {
rr.AddConn(addr, conns[i])
}

expectedConns := []*tarantool.Connection{conns[0], conns[1], conns[0], conns[1]}
for i, expected := range expectedConns {
if rr.GetNextConnection() != expected {
t.Errorf("Unexpected connection on %d call", i)
}
}
}

0 comments on commit 2fc7566

Please sign in to comment.