Skip to content

Commit

Permalink
Used callback to log resource pool wait time
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd Cabancla <[email protected]>
  • Loading branch information
lcabancla committed Feb 13, 2020
1 parent 5d172c4 commit 6b8b6b4
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 28 deletions.
14 changes: 4 additions & 10 deletions go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync"
"time"

"vitess.io/vitess/go/stats"

"golang.org/x/net/context"

"vitess.io/vitess/go/sync2"
Expand Down Expand Up @@ -73,8 +71,7 @@ type ResourcePool struct {
resources chan resourceWrapper
factory Factory
idleTimer *timer.Timer
waitStats *stats.Timings
name string
logWait func(time.Time)
}

type resourceWrapper struct {
Expand All @@ -93,18 +90,17 @@ type resourceWrapper struct {
// An idleTimeout of 0 means that there is no timeout.
// A non-zero value of prefillParallelism causes the pool to be pre-filled.
// The value specifies how many resources can be opened in parallel.
func NewResourcePool(name string, factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, waitStats *stats.Timings) *ResourcePool {
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, logWait func(time.Time)) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(errors.New("invalid/out of range capacity"))
}
rp := &ResourcePool{
name: name,
resources: make(chan resourceWrapper, maxCap),
factory: factory,
available: sync2.NewAtomicInt64(int64(capacity)),
capacity: sync2.NewAtomicInt64(int64(capacity)),
idleTimeout: sync2.NewAtomicDuration(idleTimeout),
waitStats: waitStats,
logWait: logWait,
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}
Expand Down Expand Up @@ -331,9 +327,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error {
func (rp *ResourcePool) recordWait(start time.Time) {
rp.waitCount.Add(1)
rp.waitTime.Add(time.Since(start))
if rp.name != "" {
rp.waitStats.Record(rp.name+"ResourceWaitTime", start)
}
rp.logWait(start)
}

// SetIdleTimeout sets the idle timeout. It can only be used if there was an
Expand Down
53 changes: 37 additions & 16 deletions go/pools/resource_pool_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import (
"testing"
"time"

"vitess.io/vitess/go/stats"

"golang.org/x/net/context"
"vitess.io/vitess/go/sync2"
)

var lastID, count sync2.AtomicInt64
var waitTimes = stats.NewTimings("", "", "")
var waitStarts []time.Time

type TestResource struct {
num int64
Expand All @@ -42,6 +40,10 @@ func (tr *TestResource) Close() {
}
}

func logWait(start time.Time) {
waitStarts = append(waitStarts, start)
}

func PoolFactory() (Resource, error) {
count.Add(1)
return &TestResource{lastID.Add(1), false}, nil
Expand All @@ -60,7 +62,9 @@ func TestOpen(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 6, 6, time.Second, 0, waitTimes)
waitStarts = waitStarts[:0]

p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0, logWait)
p.SetCapacity(5)
var resources [10]Resource

Expand All @@ -77,6 +81,9 @@ func TestOpen(t *testing.T) {
if p.WaitCount() != 0 {
t.Errorf("expecting 0, received %d", p.WaitCount())
}
if len(waitStarts) != 0 {
t.Errorf("expecting 0, received %d", len(waitStarts))
}
if p.WaitTime() != 0 {
t.Errorf("expecting 0, received %d", p.WaitTime())
}
Expand Down Expand Up @@ -112,6 +119,15 @@ func TestOpen(t *testing.T) {
if p.WaitCount() != 5 {
t.Errorf("Expecting 5, received %d", p.WaitCount())
}
if int64(len(waitStarts)) != p.WaitCount() {
t.Errorf("expecting %d, received %d", p.WaitCount(), len(waitStarts))
}
// verify start times are monotonic increasing
for i := 1; i < len(waitStarts); i++ {
if waitStarts[i].Before(waitStarts[i-1]) {
t.Errorf("Expecting monotonic increasing start times")
}
}
if p.WaitTime() == 0 {
t.Errorf("Expecting non-zero")
}
Expand Down Expand Up @@ -201,12 +217,12 @@ func TestOpen(t *testing.T) {
func TestPrefill(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 1, waitTimes)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if p.Active() != 5 {
t.Errorf("p.Active(): %d, want 5", p.Active())
}
p = NewResourcePool("", FailFactory, 5, 5, time.Second, 1, waitTimes)
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
Expand All @@ -221,7 +237,7 @@ func TestPrefillTimeout(t *testing.T) {
defer func() { prefillTimeout = saveTimeout }()

start := time.Now()
p := NewResourcePool("", SlowFailFactory, 5, 5, time.Second, 1, waitTimes)
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if elapsed := time.Since(start); elapsed > 20*time.Millisecond {
t.Errorf("elapsed: %v, should be around 10ms", elapsed)
Expand All @@ -235,7 +251,9 @@ func TestShrinking(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes)
waitStarts = waitStarts[:0]

p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
var resources [10]Resource
// Leave one empty slot in the pool
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -318,6 +336,9 @@ func TestShrinking(t *testing.T) {
if p.WaitCount() != 1 {
t.Errorf("Expecting 1, received %d", p.WaitCount())
}
if int64(len(waitStarts)) != p.WaitCount() {
t.Errorf("Expecting %d, received %d", p.WaitCount(), len(waitStarts))
}
if count.Get() != 2 {
t.Errorf("Expecting 2, received %d", count.Get())
}
Expand Down Expand Up @@ -374,7 +395,7 @@ func TestClosing(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
var resources [10]Resource
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -428,7 +449,7 @@ func TestIdleTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait)
defer p.Close()

r, err := p.Get(ctx)
Expand Down Expand Up @@ -539,7 +560,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -560,7 +581,7 @@ func TestCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", FailFactory, 5, 5, time.Second, 0, waitTimes)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0, logWait)
defer p.Close()
if _, err := p.Get(ctx); err.Error() != "Failed" {
t.Errorf("Expecting Failed, received %v", err)
Expand All @@ -576,7 +597,7 @@ func TestCreateFailOnPut(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
defer p.Close()
_, err := p.Get(ctx)
if err != nil {
Expand All @@ -593,7 +614,7 @@ func TestSlowCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", SlowFailFactory, 2, 2, time.Second, 0, waitTimes)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0, logWait)
defer p.Close()
ch := make(chan bool)
// The third Get should not wait indefinitely
Expand All @@ -615,7 +636,7 @@ func TestTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -634,7 +655,7 @@ func TestTimeout(t *testing.T) {
func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
Expand Down
11 changes: 10 additions & 1 deletion go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.mu.Unlock()
cp.info = info
cp.mysqlStats = mysqlStats
cp.connections = pools.NewResourcePool(cp.name, cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, mysqlStats)
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, cp.getLogWaitCallback())
// Check if we need to resolve a hostname (The Host is not just an IP address).
if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil {
cp.hostIsNotIP = true
Expand All @@ -169,6 +169,15 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
}
}

func (cp *ConnectionPool) getLogWaitCallback() func(time.Time) {
if cp.name == "" {
return func(start time.Time) {} // no op
}
return func(start time.Time) {
cp.mysqlStats.Record(cp.name+"ResourceWaitTime", start)
}
}

// connect is used by the resource pool to create a new Resource.
func (cp *ConnectionPool) connect() (pools.Resource, error) {
c, err := NewDBConnection(cp.info, cp.mysqlStats)
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,21 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) {
f := func() (pools.Resource, error) {
return NewDBConn(cp, appParams)
}
cp.connections = pools.NewResourcePool(cp.name, f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, tabletenv.WaitStats)
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, cp.getLogWaitCallback())
cp.appDebugParams = appDebugParams

cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats)
}

func (cp *Pool) getLogWaitCallback() func(time.Time) {
if cp.name == "" {
return func(start time.Time) {} // no op
}
return func(start time.Time) {
tabletenv.WaitStats.Record(cp.name+"ResourceWaitTime", start)
}
}

// Close will close the pool and wait for connections to be returned before
// exiting.
func (cp *Pool) Close() {
Expand Down

0 comments on commit 6b8b6b4

Please sign in to comment.