From 6b8b6b460e61c2a8ca07e9cb56b3c2461df5e676 Mon Sep 17 00:00:00 2001 From: Lloyd Cabancla Date: Thu, 13 Feb 2020 14:22:22 -0500 Subject: [PATCH] Used callback to log resource pool wait time Signed-off-by: Lloyd Cabancla --- go/pools/resource_pool.go | 14 ++---- go/pools/resource_pool_flaky_test.go | 53 ++++++++++++++------ go/vt/dbconnpool/connection_pool.go | 11 +++- go/vt/vttablet/tabletserver/connpool/pool.go | 11 +++- 4 files changed, 61 insertions(+), 28 deletions(-) diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go index 60ee91bf993..d3459b06d53 100644 --- a/go/pools/resource_pool.go +++ b/go/pools/resource_pool.go @@ -24,8 +24,6 @@ import ( "sync" "time" - "vitess.io/vitess/go/stats" - "golang.org/x/net/context" "vitess.io/vitess/go/sync2" @@ -73,8 +71,7 @@ type ResourcePool struct { resources chan resourceWrapper factory Factory idleTimer *timer.Timer - waitStats *stats.Timings - name string + logWait func(time.Time) } type resourceWrapper struct { @@ -93,18 +90,17 @@ type resourceWrapper struct { // An idleTimeout of 0 means that there is no timeout. // A non-zero value of prefillParallelism causes the pool to be pre-filled. // The value specifies how many resources can be opened in parallel. -func NewResourcePool(name string, factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, waitStats *stats.Timings) *ResourcePool { +func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, logWait func(time.Time)) *ResourcePool { if capacity <= 0 || maxCap <= 0 || capacity > maxCap { panic(errors.New("invalid/out of range capacity")) } rp := &ResourcePool{ - name: name, resources: make(chan resourceWrapper, maxCap), factory: factory, available: sync2.NewAtomicInt64(int64(capacity)), capacity: sync2.NewAtomicInt64(int64(capacity)), idleTimeout: sync2.NewAtomicDuration(idleTimeout), - waitStats: waitStats, + logWait: logWait, } for i := 0; i < capacity; i++ { rp.resources <- resourceWrapper{} @@ -331,9 +327,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error { func (rp *ResourcePool) recordWait(start time.Time) { rp.waitCount.Add(1) rp.waitTime.Add(time.Since(start)) - if rp.name != "" { - rp.waitStats.Record(rp.name+"ResourceWaitTime", start) - } + rp.logWait(start) } // SetIdleTimeout sets the idle timeout. It can only be used if there was an diff --git a/go/pools/resource_pool_flaky_test.go b/go/pools/resource_pool_flaky_test.go index 851ae5633a3..eeb3f8eda9a 100644 --- a/go/pools/resource_pool_flaky_test.go +++ b/go/pools/resource_pool_flaky_test.go @@ -21,14 +21,12 @@ import ( "testing" "time" - "vitess.io/vitess/go/stats" - "golang.org/x/net/context" "vitess.io/vitess/go/sync2" ) var lastID, count sync2.AtomicInt64 -var waitTimes = stats.NewTimings("", "", "") +var waitStarts []time.Time type TestResource struct { num int64 @@ -42,6 +40,10 @@ func (tr *TestResource) Close() { } } +func logWait(start time.Time) { + waitStarts = append(waitStarts, start) +} + func PoolFactory() (Resource, error) { count.Add(1) return &TestResource{lastID.Add(1), false}, nil @@ -60,7 +62,9 @@ func TestOpen(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 6, 6, time.Second, 0, waitTimes) + waitStarts = waitStarts[:0] + + p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0, logWait) p.SetCapacity(5) var resources [10]Resource @@ -77,6 +81,9 @@ func TestOpen(t *testing.T) { if p.WaitCount() != 0 { t.Errorf("expecting 0, received %d", p.WaitCount()) } + if len(waitStarts) != 0 { + t.Errorf("expecting 0, received %d", len(waitStarts)) + } if p.WaitTime() != 0 { t.Errorf("expecting 0, received %d", p.WaitTime()) } @@ -112,6 +119,15 @@ func TestOpen(t *testing.T) { if p.WaitCount() != 5 { t.Errorf("Expecting 5, received %d", p.WaitCount()) } + if int64(len(waitStarts)) != p.WaitCount() { + t.Errorf("expecting %d, received %d", p.WaitCount(), len(waitStarts)) + } + // verify start times are monotonic increasing + for i := 1; i < len(waitStarts); i++ { + if waitStarts[i].Before(waitStarts[i-1]) { + t.Errorf("Expecting monotonic increasing start times") + } + } if p.WaitTime() == 0 { t.Errorf("Expecting non-zero") } @@ -201,12 +217,12 @@ func TestOpen(t *testing.T) { func TestPrefill(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 1, waitTimes) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if p.Active() != 5 { t.Errorf("p.Active(): %d, want 5", p.Active()) } - p = NewResourcePool("", FailFactory, 5, 5, time.Second, 1, waitTimes) + p = NewResourcePool(FailFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if p.Active() != 0 { t.Errorf("p.Active(): %d, want 0", p.Active()) @@ -221,7 +237,7 @@ func TestPrefillTimeout(t *testing.T) { defer func() { prefillTimeout = saveTimeout }() start := time.Now() - p := NewResourcePool("", SlowFailFactory, 5, 5, time.Second, 1, waitTimes) + p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if elapsed := time.Since(start); elapsed > 20*time.Millisecond { t.Errorf("elapsed: %v, should be around 10ms", elapsed) @@ -235,7 +251,9 @@ func TestShrinking(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) + waitStarts = waitStarts[:0] + + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) var resources [10]Resource // Leave one empty slot in the pool for i := 0; i < 4; i++ { @@ -318,6 +336,9 @@ func TestShrinking(t *testing.T) { if p.WaitCount() != 1 { t.Errorf("Expecting 1, received %d", p.WaitCount()) } + if int64(len(waitStarts)) != p.WaitCount() { + t.Errorf("Expecting %d, received %d", p.WaitCount(), len(waitStarts)) + } if count.Get() != 2 { t.Errorf("Expecting 2, received %d", count.Get()) } @@ -374,7 +395,7 @@ func TestClosing(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) var resources [10]Resource for i := 0; i < 5; i++ { r, err := p.Get(ctx) @@ -428,7 +449,7 @@ func TestIdleTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait) defer p.Close() r, err := p.Get(ctx) @@ -539,7 +560,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -560,7 +581,7 @@ func TestCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", FailFactory, 5, 5, time.Second, 0, waitTimes) + p := NewResourcePool(FailFactory, 5, 5, time.Second, 0, logWait) defer p.Close() if _, err := p.Get(ctx); err.Error() != "Failed" { t.Errorf("Expecting Failed, received %v", err) @@ -576,7 +597,7 @@ func TestCreateFailOnPut(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) defer p.Close() _, err := p.Get(ctx) if err != nil { @@ -593,7 +614,7 @@ func TestSlowCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", SlowFailFactory, 2, 2, time.Second, 0, waitTimes) + p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0, logWait) defer p.Close() ch := make(chan bool) // The third Get should not wait indefinitely @@ -615,7 +636,7 @@ func TestTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -634,7 +655,7 @@ func TestTimeout(t *testing.T) { func TestExpired(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait) defer p.Close() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) r, err := p.Get(ctx) diff --git a/go/vt/dbconnpool/connection_pool.go b/go/vt/dbconnpool/connection_pool.go index 876694571a9..6a22e561d6e 100644 --- a/go/vt/dbconnpool/connection_pool.go +++ b/go/vt/dbconnpool/connection_pool.go @@ -147,7 +147,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings defer cp.mu.Unlock() cp.info = info cp.mysqlStats = mysqlStats - cp.connections = pools.NewResourcePool(cp.name, cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, mysqlStats) + cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, cp.getLogWaitCallback()) // Check if we need to resolve a hostname (The Host is not just an IP address). if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil { cp.hostIsNotIP = true @@ -169,6 +169,15 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings } } +func (cp *ConnectionPool) getLogWaitCallback() func(time.Time) { + if cp.name == "" { + return func(start time.Time) {} // no op + } + return func(start time.Time) { + cp.mysqlStats.Record(cp.name+"ResourceWaitTime", start) + } +} + // connect is used by the resource pool to create a new Resource. func (cp *ConnectionPool) connect() (pools.Resource, error) { c, err := NewDBConnection(cp.info, cp.mysqlStats) diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index eedb63531f3..516bf09b8e8 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -122,12 +122,21 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) { f := func() (pools.Resource, error) { return NewDBConn(cp, appParams) } - cp.connections = pools.NewResourcePool(cp.name, f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, tabletenv.WaitStats) + cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, cp.getLogWaitCallback()) cp.appDebugParams = appDebugParams cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats) } +func (cp *Pool) getLogWaitCallback() func(time.Time) { + if cp.name == "" { + return func(start time.Time) {} // no op + } + return func(start time.Time) { + tabletenv.WaitStats.Record(cp.name+"ResourceWaitTime", start) + } +} + // Close will close the pool and wait for connections to be returned before // exiting. func (cp *Pool) Close() {