Skip to content

Commit

Permalink
hostConnPool: introduced ConnPicker interface to abstract pool storage
Browse files Browse the repository at this point in the history
The hostConnPool logic around conns slice is moved to defaultConnPicker.

Co-authored-by: Henrik Johansson <[email protected]>
Co-authored-by: Michał Matczuk <[email protected]>
  • Loading branch information
mmatczuk and Henrik Johansson committed Oct 10, 2018
1 parent 0cabf96 commit 9b76938
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 83 deletions.
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ func (c *Conn) authenticateHandshake(ctx context.Context, authFrame *authenticat
}

func (c *Conn) closeWithError(err error) {
if c == nil {
return
}

if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
return
}
Expand Down
124 changes: 43 additions & 81 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -256,52 +255,51 @@ type hostConnPool struct {
addr string
size int
keyspace string
// protection for conns, closed, filling
mu sync.RWMutex
conns []*Conn
closed bool
filling bool

pos uint32
// protection for connPicker, closed, filling
mu sync.RWMutex
connPicker ConnPicker
closed bool
filling bool
}

func (h *hostConnPool) String() string {
h.mu.RLock()
defer h.mu.RUnlock()
size, _ := h.connPicker.Size()
return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
h.filling, h.closed, len(h.conns), h.size, h.host)
h.filling, h.closed, size, h.size, h.host)
}

func newHostConnPool(session *Session, host *HostInfo, port, size int,
keyspace string) *hostConnPool {

pool := &hostConnPool{
session: session,
host: host,
port: port,
addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
size: size,
keyspace: keyspace,
conns: make([]*Conn, 0, size),
filling: false,
closed: false,
session: session,
host: host,
port: port,
addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
size: size,
keyspace: keyspace,
connPicker: nopConnPicker{},
filling: false,
closed: false,
}

// the pool is not filled or connected
return pool
}

// Pick a connection from this connection pool for the given query.
func (pool *hostConnPool) Pick() *Conn {
func (pool *hostConnPool) Pick(token token) *Conn {
pool.mu.RLock()
defer pool.mu.RUnlock()

if pool.closed {
return nil
}

size := len(pool.conns)
if size < pool.size {
size, missing := pool.connPicker.Size()
if missing > 0 {
// try to fill the pool
go pool.fill()

Expand All @@ -310,62 +308,27 @@ func (pool *hostConnPool) Pick() *Conn {
}
}

pos := int(atomic.AddUint32(&pool.pos, 1) - 1)

var (
leastBusyConn *Conn
streamsAvailable int
)

// find the conn which has the most available streams, this is racy
for i := 0; i < size; i++ {
conn := pool.conns[(pos+i)%size]
if streams := conn.AvailableStreams(); streams > streamsAvailable {
leastBusyConn = conn
streamsAvailable = streams
}
}

return leastBusyConn
return pool.connPicker.Pick(token)
}

//Size returns the number of connections currently active in the pool
func (pool *hostConnPool) Size() int {
pool.mu.RLock()
defer pool.mu.RUnlock()

return len(pool.conns)
size, _ := pool.connPicker.Size()
return size
}

//Close the connection pool
func (pool *hostConnPool) Close() {
pool.mu.Lock()
defer pool.mu.Unlock()

if pool.closed {
pool.mu.Unlock()
return
if !pool.closed {
pool.connPicker.Close()
}
pool.closed = true

// ensure we dont try to reacquire the lock in handleError
// TODO: improve this as the following can happen
// 1) we have locked pool.mu write lock
// 2) conn.Close calls conn.closeWithError(nil)
// 3) conn.closeWithError calls conn.Close() which returns an error
// 4) conn.closeWithError calls pool.HandleError with the error from conn.Close
// 5) pool.HandleError tries to lock pool.mu
// deadlock

// empty the pool
conns := pool.conns
pool.conns = nil

pool.mu.Unlock()

// close the connections
for _, conn := range conns {
conn.Close()
}
}

// Fill the connection pool
Expand All @@ -378,8 +341,7 @@ func (pool *hostConnPool) fill() {
}

// determine the filling work to be done
startCount := len(pool.conns)
fillCount := pool.size - startCount
startCount, fillCount := pool.connPicker.Size()

// avoid filling a full (or overfull) pool
if fillCount <= 0 {
Expand All @@ -391,9 +353,7 @@ func (pool *hostConnPool) fill() {
pool.mu.RUnlock()
pool.mu.Lock()

// double check everything since the lock was released
startCount = len(pool.conns)
fillCount = pool.size - startCount
startCount, fillCount = pool.connPicker.Size()
if pool.closed || pool.filling || fillCount <= 0 {
// looks like another goroutine already beat this
// goroutine to the filling
Expand Down Expand Up @@ -427,8 +387,10 @@ func (pool *hostConnPool) fill() {
return
}

// filled one
fillCount--
// filled one, let's reload it to see if it has changed
pool.mu.RLock()
_, fillCount = pool.connPicker.Size()
pool.mu.RUnlock()
}

// fill the rest of the pool asynchronously
Expand Down Expand Up @@ -543,11 +505,21 @@ func (pool *hostConnPool) connect() (err error) {
return nil
}

pool.conns = append(pool.conns, conn)
// lazily initialize the connPicker when we know the required type
pool.initConnPicker(conn)
pool.connPicker.Put(conn)

return nil
}

func (pool *hostConnPool) initConnPicker(conn *Conn) {
if _, ok := pool.connPicker.(nopConnPicker); !ok {
return
}

pool.connPicker = newDefaultConnPicker(pool.size)
}

// handle any error from a Conn
func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
if !closed {
Expand All @@ -565,15 +537,5 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
return
}

// find the connection index
for i, candidate := range pool.conns {
if candidate == conn {
// remove the connection, not preserving order
pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]

// lost a connection, so fill the pool
go pool.fill()
break
}
}
pool.connPicker.Remove(conn)
}
111 changes: 111 additions & 0 deletions connpicker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package gocql

import (
"fmt"
"sync"
"sync/atomic"
)

type ConnPicker interface {
Pick(token) *Conn
Put(*Conn)
Remove(conn *Conn)
Size() (int, int)
Close()
}

type defaultConnPicker struct {
conns []*Conn
pos uint32
size int
mu sync.RWMutex
}

func newDefaultConnPicker(size int) *defaultConnPicker {
if size <= 0 {
panic(fmt.Sprintf("invalid pool size %d", size))
}
return &defaultConnPicker{
size: size,
}
}

func (p *defaultConnPicker) Remove(conn *Conn) {
p.mu.Lock()
defer p.mu.Unlock()

for i, candidate := range p.conns {
if candidate == conn {
p.conns[i] = nil
return
}
}
}

func (p *defaultConnPicker) Close() {
p.mu.Lock()
defer p.mu.Unlock()

conns := p.conns
p.conns = nil
for _, conn := range conns {
if conn != nil {
conn.Close()
}
}
}

func (p *defaultConnPicker) Size() (int, int) {
size := len(p.conns)
return size, p.size - size
}

func (p *defaultConnPicker) Pick(token) *Conn {
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
size := len(p.conns)

var (
leastBusyConn *Conn
streamsAvailable int
)

// find the conn which has the most available streams, this is racy
for i := 0; i < size; i++ {
conn := p.conns[(pos+i)%size]
if conn == nil {
continue
}
if streams := conn.AvailableStreams(); streams > streamsAvailable {
leastBusyConn = conn
streamsAvailable = streams
}
}

return leastBusyConn
}

func (p *defaultConnPicker) Put(conn *Conn) {
p.mu.Lock()
p.conns = append(p.conns, conn)
p.mu.Unlock()
}

// nopConnPicker is a no-operation implementation of ConnPicker.
type nopConnPicker struct{}

func (nopConnPicker) Pick(token) *Conn {
return nil
}

func (nopConnPicker) Put(*Conn) {
}

func (nopConnPicker) Remove(conn *Conn) {
}

func (nopConnPicker) Size() (int, int) {
return 0, 1 // 1 makes hostConnPool want to fill the pool
}

func (nopConnPicker) Close() {
}
2 changes: 1 addition & 1 deletion query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (q *queryExecutor) run(qry ExecutableQuery, specWG *sync.WaitGroup, results
continue
}

conn := pool.Pick()
conn := pool.Pick(selectedHost.Token())
if conn == nil {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (s *Session) getConn() *Conn {
pool, ok := s.pool.getPool(host)
if !ok {
continue
} else if conn := pool.Pick(); conn != nil {
} else if conn := pool.Pick(nil); conn != nil {
return conn
}
}
Expand Down

0 comments on commit 9b76938

Please sign in to comment.