Skip to content

Commit

Permalink
Added resource pool wait time histogram metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd Cabancla <[email protected]>
  • Loading branch information
lcabancla committed Feb 13, 2020
1 parent e9a6cb0 commit 5d172c4
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 21 deletions.
11 changes: 10 additions & 1 deletion go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/stats"

"golang.org/x/net/context"

"vitess.io/vitess/go/sync2"
Expand Down Expand Up @@ -71,6 +73,8 @@ type ResourcePool struct {
resources chan resourceWrapper
factory Factory
idleTimer *timer.Timer
waitStats *stats.Timings
name string
}

type resourceWrapper struct {
Expand All @@ -89,16 +93,18 @@ type resourceWrapper struct {
// An idleTimeout of 0 means that there is no timeout.
// A non-zero value of prefillParallelism causes the pool to be pre-filled.
// The value specifies how many resources can be opened in parallel.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool {
func NewResourcePool(name string, factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, waitStats *stats.Timings) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(errors.New("invalid/out of range capacity"))
}
rp := &ResourcePool{
name: name,
resources: make(chan resourceWrapper, maxCap),
factory: factory,
available: sync2.NewAtomicInt64(int64(capacity)),
capacity: sync2.NewAtomicInt64(int64(capacity)),
idleTimeout: sync2.NewAtomicDuration(idleTimeout),
waitStats: waitStats,
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}
Expand Down Expand Up @@ -325,6 +331,9 @@ func (rp *ResourcePool) SetCapacity(capacity int) error {
func (rp *ResourcePool) recordWait(start time.Time) {
rp.waitCount.Add(1)
rp.waitTime.Add(time.Since(start))
if rp.name != "" {
rp.waitStats.Record(rp.name+"ResourceWaitTime", start)
}
}

// SetIdleTimeout sets the idle timeout. It can only be used if there was an
Expand Down
29 changes: 16 additions & 13 deletions go/pools/resource_pool_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"testing"
"time"

"vitess.io/vitess/go/stats"

"golang.org/x/net/context"
"vitess.io/vitess/go/sync2"
)

var lastID, count sync2.AtomicInt64
var waitTimes = stats.NewTimings("", "", "")

type TestResource struct {
num int64
Expand Down Expand Up @@ -57,7 +60,7 @@ func TestOpen(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0)
p := NewResourcePool("", PoolFactory, 6, 6, time.Second, 0, waitTimes)
p.SetCapacity(5)
var resources [10]Resource

Expand Down Expand Up @@ -198,12 +201,12 @@ func TestOpen(t *testing.T) {
func TestPrefill(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 1, waitTimes)
defer p.Close()
if p.Active() != 5 {
t.Errorf("p.Active(): %d, want 5", p.Active())
}
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1)
p = NewResourcePool("", FailFactory, 5, 5, time.Second, 1, waitTimes)
defer p.Close()
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
Expand All @@ -218,7 +221,7 @@ func TestPrefillTimeout(t *testing.T) {
defer func() { prefillTimeout = saveTimeout }()

start := time.Now()
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1)
p := NewResourcePool("", SlowFailFactory, 5, 5, time.Second, 1, waitTimes)
defer p.Close()
if elapsed := time.Since(start); elapsed > 20*time.Millisecond {
t.Errorf("elapsed: %v, should be around 10ms", elapsed)
Expand All @@ -232,7 +235,7 @@ func TestShrinking(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes)
var resources [10]Resource
// Leave one empty slot in the pool
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -371,7 +374,7 @@ func TestClosing(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes)
var resources [10]Resource
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -425,7 +428,7 @@ func TestIdleTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes)
defer p.Close()

r, err := p.Get(ctx)
Expand Down Expand Up @@ -536,7 +539,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
p := NewResourcePool("", PoolFactory, 1, 1, 10*time.Millisecond, 0, waitTimes)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -557,7 +560,7 @@ func TestCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0)
p := NewResourcePool("", FailFactory, 5, 5, time.Second, 0, waitTimes)
defer p.Close()
if _, err := p.Get(ctx); err.Error() != "Failed" {
t.Errorf("Expecting Failed, received %v", err)
Expand All @@ -573,7 +576,7 @@ func TestCreateFailOnPut(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool("", PoolFactory, 5, 5, time.Second, 0, waitTimes)
defer p.Close()
_, err := p.Get(ctx)
if err != nil {
Expand All @@ -590,7 +593,7 @@ func TestSlowCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0)
p := NewResourcePool("", SlowFailFactory, 2, 2, time.Second, 0, waitTimes)
defer p.Close()
ch := make(chan bool)
// The third Get should not wait indefinitely
Expand All @@ -612,7 +615,7 @@ func TestTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -631,7 +634,7 @@ func TestTimeout(t *testing.T) {
func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
p := NewResourcePool("", PoolFactory, 1, 1, time.Second, 0, waitTimes)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
Expand Down
5 changes: 3 additions & 2 deletions go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ type ConnectionPool struct {
hostIsNotIP bool

mysqlStats *stats.Timings
name string
}

// NewConnectionPool creates a new ConnectionPool. The name is used
// to publish stats only.
func NewConnectionPool(name string, capacity int, idleTimeout time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool {
cp := &ConnectionPool{capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency}
cp := &ConnectionPool{name: name, capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency}
if name == "" || usedNames[name] {
return cp
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.mu.Unlock()
cp.info = info
cp.mysqlStats = mysqlStats
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0)
cp.connections = pools.NewResourcePool(cp.name, cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, mysqlStats)
// Check if we need to resolve a hostname (The Host is not just an IP address).
if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil {
cp.hostIsNotIP = true
Expand Down
4 changes: 0 additions & 4 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,6 @@ func TestConsolidation(t *testing.T) {
wg.Wait()

vend := framework.DebugVars()
if err := compareIntDiff(vend, "Waits/TotalCount", vstart, 1); err != nil {
t.Logf("DebugVars Waits/TotalCount not incremented with sleep=%v", sleep)
continue
}
if err := compareIntDiff(vend, "Waits/Histograms/Consolidations/Count", vstart, 1); err != nil {
t.Logf("DebugVars Waits/Histograms/Consolidations/Count not incremented with sleep=%v", sleep)
continue
Expand Down
42 changes: 42 additions & 0 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -823,3 +824,44 @@ func TestManualTwopcz(t *testing.T) {
fmt.Print("Sleeping for 30 seconds\n")
time.Sleep(30 * time.Second)
}

func TestTransactionPoolResourceWaitTime(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.TxPoolSize())
defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout())
framework.Server.SetTxPoolSize(1)
framework.Server.SetTxPoolTimeout(10 * time.Second)
debugVarPath := "Waits/Histograms/TransactionPoolResourceWaitTime/Count"

for sleep := 0.1; sleep < 10.0; sleep *= 2 {
vstart := framework.DebugVars()
var wg sync.WaitGroup
wg.Add(2)

transactionFunc := func() {
client := framework.NewClient()

bv := map[string]*querypb.BindVariable{}
query := fmt.Sprintf("select sleep(%v) from dual", sleep)
if _, err := client.BeginExecute(query, bv); err != nil {
t.Error(err)
return
}
if err := client.Rollback(); err != nil {
t.Error(err)
return
}
wg.Done()
}
go transactionFunc()
go transactionFunc()
wg.Wait()
vend := framework.DebugVars()
if err := compareIntDiff(vend, debugVarPath, vstart, 1); err != nil {
t.Logf("DebugVars %v not incremented with sleep=%v", debugVarPath, sleep)
continue
}
t.Logf("DebugVars %v properly incremented with sleep=%v", debugVarPath, sleep)
return
}
t.Errorf("DebugVars %v not incremented", debugVarPath)
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) {
f := func() (pools.Resource, error) {
return NewDBConn(cp, appParams)
}
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism)
cp.connections = pools.NewResourcePool(cp.name, f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, tabletenv.WaitStats)
cp.appDebugParams = appDebugParams

cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats)
Expand Down

0 comments on commit 5d172c4

Please sign in to comment.