From 5d172c4c2dc9a670ccc77430a4564236d5288ff9 Mon Sep 17 00:00:00 2001 From: Lloyd Cabancla Date: Thu, 16 Jan 2020 16:45:24 -0500 Subject: [PATCH 1/2] Added resource pool wait time histogram metrics Signed-off-by: Lloyd Cabancla --- go/pools/resource_pool.go | 11 ++++- go/pools/resource_pool_flaky_test.go | 29 ++++++++------ go/vt/dbconnpool/connection_pool.go | 5 ++- go/vt/vttablet/endtoend/misc_test.go | 4 -- go/vt/vttablet/endtoend/transaction_test.go | 42 ++++++++++++++++++++ go/vt/vttablet/tabletserver/connpool/pool.go | 2 +- 6 files changed, 72 insertions(+), 21 deletions(-) diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go index 63b8309137b..60ee91bf993 100644 --- a/go/pools/resource_pool.go +++ b/go/pools/resource_pool.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/stats" + "golang.org/x/net/context" "vitess.io/vitess/go/sync2" @@ -71,6 +73,8 @@ type ResourcePool struct { resources chan resourceWrapper factory Factory idleTimer *timer.Timer + waitStats *stats.Timings + name string } type resourceWrapper struct { @@ -89,16 +93,18 @@ type resourceWrapper struct { // An idleTimeout of 0 means that there is no timeout. // A non-zero value of prefillParallelism causes the pool to be pre-filled. // The value specifies how many resources can be opened in parallel. -func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool { +func NewResourcePool(name string, factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, waitStats *stats.Timings) *ResourcePool { if capacity <= 0 || maxCap <= 0 || capacity > maxCap { panic(errors.New("invalid/out of range capacity")) } rp := &ResourcePool{ + name: name, resources: make(chan resourceWrapper, maxCap), factory: factory, available: sync2.NewAtomicInt64(int64(capacity)), capacity: sync2.NewAtomicInt64(int64(capacity)), idleTimeout: sync2.NewAtomicDuration(idleTimeout), + waitStats: waitStats, } for i := 0; i < capacity; i++ { rp.resources <- resourceWrapper{} @@ -325,6 +331,9 @@ func (rp *ResourcePool) SetCapacity(capacity int) error { func (rp *ResourcePool) recordWait(start time.Time) { rp.waitCount.Add(1) rp.waitTime.Add(time.Since(start)) + if rp.name != "" { + rp.waitStats.Record(rp.name+"ResourceWaitTime", start) + } } // SetIdleTimeout sets the idle timeout. It can only be used if there was an diff --git a/go/pools/resource_pool_flaky_test.go b/go/pools/resource_pool_flaky_test.go index f3950e5e23e..851ae5633a3 100644 --- a/go/pools/resource_pool_flaky_test.go +++ b/go/pools/resource_pool_flaky_test.go @@ -21,11 +21,14 @@ import ( "testing" "time" + "vitess.io/vitess/go/stats" + "golang.org/x/net/context" "vitess.io/vitess/go/sync2" ) var lastID, count sync2.AtomicInt64 +var waitTimes = stats.NewTimings("", "", "") type TestResource struct { num int64 @@ -57,7 +60,7 @@ func TestOpen(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0) + p := NewResourcePool("", PoolFactory, 6, 6, time.Second, 0, waitTimes) p.SetCapacity(5) var resources [10]Resource @@ -198,12 +201,12 @@ func TestOpen(t *testing.T) { func TestPrefill(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1) + p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 1, waitTimes) defer p.Close() if p.Active() != 5 { t.Errorf("p.Active(): %d, want 5", p.Active()) } - p = NewResourcePool(FailFactory, 5, 5, time.Second, 1) + p = NewResourcePool("", FailFactory, 5, 5, time.Second, 1, waitTimes) defer p.Close() if p.Active() != 0 { t.Errorf("p.Active(): %d, want 0", p.Active()) @@ -218,7 +221,7 @@ func TestPrefillTimeout(t *testing.T) { defer func() { prefillTimeout = saveTimeout }() start := time.Now() - p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1) + p := NewResourcePool("", SlowFailFactory, 5, 5, time.Second, 1, waitTimes) defer p.Close() if elapsed := time.Since(start); elapsed > 20*time.Millisecond { t.Errorf("elapsed: %v, should be around 10ms", elapsed) @@ -232,7 +235,7 @@ func TestShrinking(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) + p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) var resources [10]Resource // Leave one empty slot in the pool for i := 0; i < 4; i++ { @@ -371,7 +374,7 @@ func TestClosing(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) + p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) var resources [10]Resource for i := 0; i < 5; i++ { r, err := p.Get(ctx) @@ -425,7 +428,7 @@ func TestIdleTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0) + p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes) defer p.Close() r, err := p.Get(ctx) @@ -536,7 +539,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0) + p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -557,7 +560,7 @@ func TestCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(FailFactory, 5, 5, time.Second, 0) + p := NewResourcePool("", FailFactory, 5, 5, time.Second, 0, waitTimes) defer p.Close() if _, err := p.Get(ctx); err.Error() != "Failed" { t.Errorf("Expecting Failed, received %v", err) @@ -573,7 +576,7 @@ func TestCreateFailOnPut(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0) + p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) defer p.Close() _, err := p.Get(ctx) if err != nil { @@ -590,7 +593,7 @@ func TestSlowCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0) + p := NewResourcePool("", SlowFailFactory, 2, 2, time.Second, 0, waitTimes) defer p.Close() ch := make(chan bool) // The third Get should not wait indefinitely @@ -612,7 +615,7 @@ func TestTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) + p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -631,7 +634,7 @@ func TestTimeout(t *testing.T) { func TestExpired(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0) + p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes) defer p.Close() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) r, err := p.Get(ctx) diff --git a/go/vt/dbconnpool/connection_pool.go b/go/vt/dbconnpool/connection_pool.go index 472f4942cde..876694571a9 100644 --- a/go/vt/dbconnpool/connection_pool.go +++ b/go/vt/dbconnpool/connection_pool.go @@ -65,12 +65,13 @@ type ConnectionPool struct { hostIsNotIP bool mysqlStats *stats.Timings + name string } // NewConnectionPool creates a new ConnectionPool. The name is used // to publish stats only. func NewConnectionPool(name string, capacity int, idleTimeout time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool { - cp := &ConnectionPool{capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency} + cp := &ConnectionPool{name: name, capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency} if name == "" || usedNames[name] { return cp } @@ -146,7 +147,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings defer cp.mu.Unlock() cp.info = info cp.mysqlStats = mysqlStats - cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0) + cp.connections = pools.NewResourcePool(cp.name, cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, mysqlStats) // Check if we need to resolve a hostname (The Host is not just an IP address). if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil { cp.hostIsNotIP = true diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go index eac102015d5..f463514f150 100644 --- a/go/vt/vttablet/endtoend/misc_test.go +++ b/go/vt/vttablet/endtoend/misc_test.go @@ -296,10 +296,6 @@ func TestConsolidation(t *testing.T) { wg.Wait() vend := framework.DebugVars() - if err := compareIntDiff(vend, "Waits/TotalCount", vstart, 1); err != nil { - t.Logf("DebugVars Waits/TotalCount not incremented with sleep=%v", sleep) - continue - } if err := compareIntDiff(vend, "Waits/Histograms/Consolidations/Count", vstart, 1); err != nil { t.Logf("DebugVars Waits/Histograms/Consolidations/Count not incremented with sleep=%v", sleep) continue diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index eecb7d74526..63e4444edd1 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "strings" + "sync" "testing" "time" @@ -823,3 +824,44 @@ func TestManualTwopcz(t *testing.T) { fmt.Print("Sleeping for 30 seconds\n") time.Sleep(30 * time.Second) } + +func TestTransactionPoolResourceWaitTime(t *testing.T) { + defer framework.Server.SetPoolSize(framework.Server.TxPoolSize()) + defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout()) + framework.Server.SetTxPoolSize(1) + framework.Server.SetTxPoolTimeout(10 * time.Second) + debugVarPath := "Waits/Histograms/TransactionPoolResourceWaitTime/Count" + + for sleep := 0.1; sleep < 10.0; sleep *= 2 { + vstart := framework.DebugVars() + var wg sync.WaitGroup + wg.Add(2) + + transactionFunc := func() { + client := framework.NewClient() + + bv := map[string]*querypb.BindVariable{} + query := fmt.Sprintf("select sleep(%v) from dual", sleep) + if _, err := client.BeginExecute(query, bv); err != nil { + t.Error(err) + return + } + if err := client.Rollback(); err != nil { + t.Error(err) + return + } + wg.Done() + } + go transactionFunc() + go transactionFunc() + wg.Wait() + vend := framework.DebugVars() + if err := compareIntDiff(vend, debugVarPath, vstart, 1); err != nil { + t.Logf("DebugVars %v not incremented with sleep=%v", debugVarPath, sleep) + continue + } + t.Logf("DebugVars %v properly incremented with sleep=%v", debugVarPath, sleep) + return + } + t.Errorf("DebugVars %v not incremented", debugVarPath) +} diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index 1c32929e91d..eedb63531f3 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -122,7 +122,7 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) { f := func() (pools.Resource, error) { return NewDBConn(cp, appParams) } - cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism) + cp.connections = pools.NewResourcePool(cp.name, f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, tabletenv.WaitStats) cp.appDebugParams = appDebugParams cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats) From 6b8b6b460e61c2a8ca07e9cb56b3c2461df5e676 Mon Sep 17 00:00:00 2001 From: Lloyd Cabancla Date: Thu, 13 Feb 2020 14:22:22 -0500 Subject: [PATCH 2/2] Used callback to log resource pool wait time Signed-off-by: Lloyd Cabancla --- go/pools/resource_pool.go | 14 ++---- go/pools/resource_pool_flaky_test.go | 53 ++++++++++++++------ go/vt/dbconnpool/connection_pool.go | 11 +++- go/vt/vttablet/tabletserver/connpool/pool.go | 11 +++- 4 files changed, 61 insertions(+), 28 deletions(-) diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go index 60ee91bf993..d3459b06d53 100644 --- a/go/pools/resource_pool.go +++ b/go/pools/resource_pool.go @@ -24,8 +24,6 @@ import ( "sync" "time" - "vitess.io/vitess/go/stats" - "golang.org/x/net/context" "vitess.io/vitess/go/sync2" @@ -73,8 +71,7 @@ type ResourcePool struct { resources chan resourceWrapper factory Factory idleTimer *timer.Timer - waitStats *stats.Timings - name string + logWait func(time.Time) } type resourceWrapper struct { @@ -93,18 +90,17 @@ type resourceWrapper struct { // An idleTimeout of 0 means that there is no timeout. // A non-zero value of prefillParallelism causes the pool to be pre-filled. // The value specifies how many resources can be opened in parallel. -func NewResourcePool(name string, factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, waitStats *stats.Timings) *ResourcePool { +func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, logWait func(time.Time)) *ResourcePool { if capacity <= 0 || maxCap <= 0 || capacity > maxCap { panic(errors.New("invalid/out of range capacity")) } rp := &ResourcePool{ - name: name, resources: make(chan resourceWrapper, maxCap), factory: factory, available: sync2.NewAtomicInt64(int64(capacity)), capacity: sync2.NewAtomicInt64(int64(capacity)), idleTimeout: sync2.NewAtomicDuration(idleTimeout), - waitStats: waitStats, + logWait: logWait, } for i := 0; i < capacity; i++ { rp.resources <- resourceWrapper{} @@ -331,9 +327,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error { func (rp *ResourcePool) recordWait(start time.Time) { rp.waitCount.Add(1) rp.waitTime.Add(time.Since(start)) - if rp.name != "" { - rp.waitStats.Record(rp.name+"ResourceWaitTime", start) - } + rp.logWait(start) } // SetIdleTimeout sets the idle timeout. It can only be used if there was an diff --git a/go/pools/resource_pool_flaky_test.go b/go/pools/resource_pool_flaky_test.go index 851ae5633a3..eeb3f8eda9a 100644 --- a/go/pools/resource_pool_flaky_test.go +++ b/go/pools/resource_pool_flaky_test.go @@ -21,14 +21,12 @@ import ( "testing" "time" - "vitess.io/vitess/go/stats" - "golang.org/x/net/context" "vitess.io/vitess/go/sync2" ) var lastID, count sync2.AtomicInt64 -var waitTimes = stats.NewTimings("", "", "") +var waitStarts []time.Time type TestResource struct { num int64 @@ -42,6 +40,10 @@ func (tr *TestResource) Close() { } } +func logWait(start time.Time) { + waitStarts = append(waitStarts, start) +} + func PoolFactory() (Resource, error) { count.Add(1) return &TestResource{lastID.Add(1), false}, nil @@ -60,7 +62,9 @@ func TestOpen(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 6, 6, time.Second, 0, waitTimes) + waitStarts = waitStarts[:0] + + p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0, logWait) p.SetCapacity(5) var resources [10]Resource @@ -77,6 +81,9 @@ func TestOpen(t *testing.T) { if p.WaitCount() != 0 { t.Errorf("expecting 0, received %d", p.WaitCount()) } + if len(waitStarts) != 0 { + t.Errorf("expecting 0, received %d", len(waitStarts)) + } if p.WaitTime() != 0 { t.Errorf("expecting 0, received %d", p.WaitTime()) } @@ -112,6 +119,15 @@ func TestOpen(t *testing.T) { if p.WaitCount() != 5 { t.Errorf("Expecting 5, received %d", p.WaitCount()) } + if int64(len(waitStarts)) != p.WaitCount() { + t.Errorf("expecting %d, received %d", p.WaitCount(), len(waitStarts)) + } + // verify start times are monotonic increasing + for i := 1; i < len(waitStarts); i++ { + if waitStarts[i].Before(waitStarts[i-1]) { + t.Errorf("Expecting monotonic increasing start times") + } + } if p.WaitTime() == 0 { t.Errorf("Expecting non-zero") } @@ -201,12 +217,12 @@ func TestOpen(t *testing.T) { func TestPrefill(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 1, waitTimes) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if p.Active() != 5 { t.Errorf("p.Active(): %d, want 5", p.Active()) } - p = NewResourcePool("", FailFactory, 5, 5, time.Second, 1, waitTimes) + p = NewResourcePool(FailFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if p.Active() != 0 { t.Errorf("p.Active(): %d, want 0", p.Active()) @@ -221,7 +237,7 @@ func TestPrefillTimeout(t *testing.T) { defer func() { prefillTimeout = saveTimeout }() start := time.Now() - p := NewResourcePool("", SlowFailFactory, 5, 5, time.Second, 1, waitTimes) + p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1, logWait) defer p.Close() if elapsed := time.Since(start); elapsed > 20*time.Millisecond { t.Errorf("elapsed: %v, should be around 10ms", elapsed) @@ -235,7 +251,9 @@ func TestShrinking(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) + waitStarts = waitStarts[:0] + + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) var resources [10]Resource // Leave one empty slot in the pool for i := 0; i < 4; i++ { @@ -318,6 +336,9 @@ func TestShrinking(t *testing.T) { if p.WaitCount() != 1 { t.Errorf("Expecting 1, received %d", p.WaitCount()) } + if int64(len(waitStarts)) != p.WaitCount() { + t.Errorf("Expecting %d, received %d", p.WaitCount(), len(waitStarts)) + } if count.Get() != 2 { t.Errorf("Expecting 2, received %d", count.Get()) } @@ -374,7 +395,7 @@ func TestClosing(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) var resources [10]Resource for i := 0; i < 5; i++ { r, err := p.Get(ctx) @@ -428,7 +449,7 @@ func TestIdleTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait) defer p.Close() r, err := p.Get(ctx) @@ -539,7 +560,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -560,7 +581,7 @@ func TestCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", FailFactory, 5, 5, time.Second, 0, waitTimes) + p := NewResourcePool(FailFactory, 5, 5, time.Second, 0, logWait) defer p.Close() if _, err := p.Get(ctx); err.Error() != "Failed" { t.Errorf("Expecting Failed, received %v", err) @@ -576,7 +597,7 @@ func TestCreateFailOnPut(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait) defer p.Close() _, err := p.Get(ctx) if err != nil { @@ -593,7 +614,7 @@ func TestSlowCreateFail(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", SlowFailFactory, 2, 2, time.Second, 0, waitTimes) + p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0, logWait) defer p.Close() ch := make(chan bool) // The third Get should not wait indefinitely @@ -615,7 +636,7 @@ func TestTimeout(t *testing.T) { ctx := context.Background() lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait) defer p.Close() r, err := p.Get(ctx) if err != nil { @@ -634,7 +655,7 @@ func TestTimeout(t *testing.T) { func TestExpired(t *testing.T) { lastID.Set(0) count.Set(0) - p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes) + p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait) defer p.Close() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) r, err := p.Get(ctx) diff --git a/go/vt/dbconnpool/connection_pool.go b/go/vt/dbconnpool/connection_pool.go index 876694571a9..6a22e561d6e 100644 --- a/go/vt/dbconnpool/connection_pool.go +++ b/go/vt/dbconnpool/connection_pool.go @@ -147,7 +147,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings defer cp.mu.Unlock() cp.info = info cp.mysqlStats = mysqlStats - cp.connections = pools.NewResourcePool(cp.name, cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, mysqlStats) + cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, cp.getLogWaitCallback()) // Check if we need to resolve a hostname (The Host is not just an IP address). if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil { cp.hostIsNotIP = true @@ -169,6 +169,15 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings } } +func (cp *ConnectionPool) getLogWaitCallback() func(time.Time) { + if cp.name == "" { + return func(start time.Time) {} // no op + } + return func(start time.Time) { + cp.mysqlStats.Record(cp.name+"ResourceWaitTime", start) + } +} + // connect is used by the resource pool to create a new Resource. func (cp *ConnectionPool) connect() (pools.Resource, error) { c, err := NewDBConnection(cp.info, cp.mysqlStats) diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index eedb63531f3..516bf09b8e8 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -122,12 +122,21 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) { f := func() (pools.Resource, error) { return NewDBConn(cp, appParams) } - cp.connections = pools.NewResourcePool(cp.name, f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, tabletenv.WaitStats) + cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, cp.getLogWaitCallback()) cp.appDebugParams = appDebugParams cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats) } +func (cp *Pool) getLogWaitCallback() func(time.Time) { + if cp.name == "" { + return func(start time.Time) {} // no op + } + return func(start time.Time) { + tabletenv.WaitStats.Record(cp.name+"ResourceWaitTime", start) + } +} + // Close will close the pool and wait for connections to be returned before // exiting. func (cp *Pool) Close() {