Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added resource pool wait time histogram metrics #5727

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type ResourcePool struct {
resources chan resourceWrapper
factory Factory
idleTimer *timer.Timer
logWait func(time.Time)
}

type resourceWrapper struct {
Expand All @@ -89,7 +90,7 @@ type resourceWrapper struct {
// An idleTimeout of 0 means that there is no timeout.
// A non-zero value of prefillParallelism causes the pool to be pre-filled.
// The value specifies how many resources can be opened in parallel.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool {
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, logWait func(time.Time)) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(errors.New("invalid/out of range capacity"))
}
Expand All @@ -99,6 +100,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
available: sync2.NewAtomicInt64(int64(capacity)),
capacity: sync2.NewAtomicInt64(int64(capacity)),
idleTimeout: sync2.NewAtomicDuration(idleTimeout),
logWait: logWait,
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}
Expand Down Expand Up @@ -325,6 +327,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error {
func (rp *ResourcePool) recordWait(start time.Time) {
rp.waitCount.Add(1)
rp.waitTime.Add(time.Since(start))
rp.logWait(start)
}

// SetIdleTimeout sets the idle timeout. It can only be used if there was an
Expand Down
50 changes: 37 additions & 13 deletions go/pools/resource_pool_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

var lastID, count sync2.AtomicInt64
var waitStarts []time.Time

type TestResource struct {
num int64
Expand All @@ -39,6 +40,10 @@ func (tr *TestResource) Close() {
}
}

func logWait(start time.Time) {
waitStarts = append(waitStarts, start)
}

func PoolFactory() (Resource, error) {
count.Add(1)
return &TestResource{lastID.Add(1), false}, nil
Expand All @@ -57,7 +62,9 @@ func TestOpen(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0)
waitStarts = waitStarts[:0]

p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0, logWait)
p.SetCapacity(5)
var resources [10]Resource

Expand All @@ -74,6 +81,9 @@ func TestOpen(t *testing.T) {
if p.WaitCount() != 0 {
t.Errorf("expecting 0, received %d", p.WaitCount())
}
if len(waitStarts) != 0 {
t.Errorf("expecting 0, received %d", len(waitStarts))
}
if p.WaitTime() != 0 {
t.Errorf("expecting 0, received %d", p.WaitTime())
}
Expand Down Expand Up @@ -109,6 +119,15 @@ func TestOpen(t *testing.T) {
if p.WaitCount() != 5 {
t.Errorf("Expecting 5, received %d", p.WaitCount())
}
if int64(len(waitStarts)) != p.WaitCount() {
t.Errorf("expecting %d, received %d", p.WaitCount(), len(waitStarts))
}
// verify start times are monotonic increasing
for i := 1; i < len(waitStarts); i++ {
if waitStarts[i].Before(waitStarts[i-1]) {
t.Errorf("Expecting monotonic increasing start times")
}
}
if p.WaitTime() == 0 {
t.Errorf("Expecting non-zero")
}
Expand Down Expand Up @@ -198,12 +217,12 @@ func TestOpen(t *testing.T) {
func TestPrefill(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if p.Active() != 5 {
t.Errorf("p.Active(): %d, want 5", p.Active())
}
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1)
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
Expand All @@ -218,7 +237,7 @@ func TestPrefillTimeout(t *testing.T) {
defer func() { prefillTimeout = saveTimeout }()

start := time.Now()
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1)
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if elapsed := time.Since(start); elapsed > 20*time.Millisecond {
t.Errorf("elapsed: %v, should be around 10ms", elapsed)
Expand All @@ -232,7 +251,9 @@ func TestShrinking(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
waitStarts = waitStarts[:0]

p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
var resources [10]Resource
// Leave one empty slot in the pool
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -315,6 +336,9 @@ func TestShrinking(t *testing.T) {
if p.WaitCount() != 1 {
t.Errorf("Expecting 1, received %d", p.WaitCount())
}
if int64(len(waitStarts)) != p.WaitCount() {
t.Errorf("Expecting %d, received %d", p.WaitCount(), len(waitStarts))
}
if count.Get() != 2 {
t.Errorf("Expecting 2, received %d", count.Get())
}
Expand Down Expand Up @@ -371,7 +395,7 @@ func TestClosing(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
var resources [10]Resource
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -425,7 +449,7 @@ func TestIdleTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait)
defer p.Close()

r, err := p.Get(ctx)
Expand Down Expand Up @@ -536,7 +560,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -557,7 +581,7 @@ func TestCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0, logWait)
defer p.Close()
if _, err := p.Get(ctx); err.Error() != "Failed" {
t.Errorf("Expecting Failed, received %v", err)
Expand All @@ -573,7 +597,7 @@ func TestCreateFailOnPut(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
defer p.Close()
_, err := p.Get(ctx)
if err != nil {
Expand All @@ -590,7 +614,7 @@ func TestSlowCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0, logWait)
defer p.Close()
ch := make(chan bool)
// The third Get should not wait indefinitely
Expand All @@ -612,7 +636,7 @@ func TestTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -631,7 +655,7 @@ func TestTimeout(t *testing.T) {
func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
Expand Down
14 changes: 12 additions & 2 deletions go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ type ConnectionPool struct {
hostIsNotIP bool

mysqlStats *stats.Timings
name string
}

// NewConnectionPool creates a new ConnectionPool. The name is used
// to publish stats only.
func NewConnectionPool(name string, capacity int, idleTimeout time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool {
cp := &ConnectionPool{capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency}
cp := &ConnectionPool{name: name, capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency}
if name == "" || usedNames[name] {
return cp
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.mu.Unlock()
cp.info = info
cp.mysqlStats = mysqlStats
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0)
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, cp.getLogWaitCallback())
// Check if we need to resolve a hostname (The Host is not just an IP address).
if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil {
cp.hostIsNotIP = true
Expand All @@ -168,6 +169,15 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
}
}

func (cp *ConnectionPool) getLogWaitCallback() func(time.Time) {
if cp.name == "" {
return func(start time.Time) {} // no op
}
return func(start time.Time) {
cp.mysqlStats.Record(cp.name+"ResourceWaitTime", start)
}
}

// connect is used by the resource pool to create a new Resource.
func (cp *ConnectionPool) connect() (pools.Resource, error) {
c, err := NewDBConnection(cp.info, cp.mysqlStats)
Expand Down
4 changes: 0 additions & 4 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,6 @@ func TestConsolidation(t *testing.T) {
wg.Wait()

vend := framework.DebugVars()
if err := compareIntDiff(vend, "Waits/TotalCount", vstart, 1); err != nil {
t.Logf("DebugVars Waits/TotalCount not incremented with sleep=%v", sleep)
continue
}
if err := compareIntDiff(vend, "Waits/Histograms/Consolidations/Count", vstart, 1); err != nil {
t.Logf("DebugVars Waits/Histograms/Consolidations/Count not incremented with sleep=%v", sleep)
continue
Expand Down
42 changes: 42 additions & 0 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -823,3 +824,44 @@ func TestManualTwopcz(t *testing.T) {
fmt.Print("Sleeping for 30 seconds\n")
time.Sleep(30 * time.Second)
}

func TestTransactionPoolResourceWaitTime(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.TxPoolSize())
defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout())
framework.Server.SetTxPoolSize(1)
framework.Server.SetTxPoolTimeout(10 * time.Second)
debugVarPath := "Waits/Histograms/TransactionPoolResourceWaitTime/Count"

for sleep := 0.1; sleep < 10.0; sleep *= 2 {
vstart := framework.DebugVars()
var wg sync.WaitGroup
wg.Add(2)

transactionFunc := func() {
client := framework.NewClient()

bv := map[string]*querypb.BindVariable{}
query := fmt.Sprintf("select sleep(%v) from dual", sleep)
if _, err := client.BeginExecute(query, bv); err != nil {
t.Error(err)
return
}
if err := client.Rollback(); err != nil {
t.Error(err)
return
}
wg.Done()
}
go transactionFunc()
go transactionFunc()
wg.Wait()
vend := framework.DebugVars()
if err := compareIntDiff(vend, debugVarPath, vstart, 1); err != nil {
t.Logf("DebugVars %v not incremented with sleep=%v", debugVarPath, sleep)
continue
}
t.Logf("DebugVars %v properly incremented with sleep=%v", debugVarPath, sleep)
return
}
t.Errorf("DebugVars %v not incremented", debugVarPath)
}
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,21 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) {
f := func() (pools.Resource, error) {
return NewDBConn(cp, appParams)
}
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism)
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, cp.getLogWaitCallback())
cp.appDebugParams = appDebugParams

cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats)
}

func (cp *Pool) getLogWaitCallback() func(time.Time) {
if cp.name == "" {
return func(start time.Time) {} // no op
}
return func(start time.Time) {
tabletenv.WaitStats.Record(cp.name+"ResourceWaitTime", start)
}
}

// Close will close the pool and wait for connections to be returned before
// exiting.
func (cp *Pool) Close() {
Expand Down