Skip to content

Commit

Permalink
DNM: apply fix in #101786
Browse files Browse the repository at this point in the history
Epic: none

Release note: None
  • Loading branch information
renatolabs committed Apr 26, 2023
1 parent 90553ff commit 8e27836
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,6 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error {
return nil
}

// NOTE(seanc@): see context cancellation note below.
warmupCtx, cancel := context.WithCancel(ctx)
defer cancel()

// "Warm up" the pools so we don't have to establish connections later (which
// would affect the observed latencies of the first requests, especially when
// prepared statements are used). We do this by
Expand All @@ -279,32 +275,57 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error {
// (128).
g.SetLimit(100)

var warmupConnsPerPool []int
type warmupPool struct {
maxConns int
pool *pgxpool.Pool
}
warmupPools := make([]warmupPool, len(m.Pools))
if numConns == 0 {
warmupConnsPerPool = make([]int, len(m.Pools))
for i, p := range m.Pools {
warmupConnsPerPool[i] = int(p.Config().MaxConns)
warmupPools[i].maxConns = int(p.Config().MaxConns)
warmupPools[i].pool = p
}
} else {
warmupConnsPerPool = distribute(numConns, len(m.Pools))
numPoolConns := distribute(numConns, len(m.Pools))
for i, p := range m.Pools {
poolMaxConns := int(p.Config().MaxConns)
if warmupConnsPerPool[i] > poolMaxConns {
warmupConnsPerPool[i] = poolMaxConns
warmupPools[i].pool = p
warmupPools[i].maxConns = numPoolConns[i]
maxPoolConns := int(p.Config().MaxConns)
if warmupPools[i].maxConns > maxPoolConns {
warmupPools[i].maxConns = maxPoolConns
}
}
}

var numWarmupConns int
for _, n := range warmupConnsPerPool {
numWarmupConns += n
for i, p := range warmupPools {
numWarmupConns += p.maxConns
if p.maxConns <= 0 {
warmupPools[i].maxConns = 1
}
}

// NOTE(seanc@): see context cancellation note below.
// TODO(seanc@): Change WithTimeout() back to WithCancel()
const maxWarmupTime = 5 * time.Minute // NOTE(seanc@): 5min == AWS NLB TCP idle time
const minWarmupTime = 15 * time.Second
const maxTimePerConn = 200 * time.Millisecond
warmupTime := minWarmupTime
switch {
case int(warmupTime) < numWarmupConns*int(maxTimePerConn):
warmupTime = time.Duration(numWarmupConns * int(maxTimePerConn))
case warmupTime > maxWarmupTime:
warmupTime = maxWarmupTime
}
warmupCtx, cancel := context.WithTimeout(ctx, warmupTime)
defer cancel()

warmupConns := make(chan *pgxpool.Conn, numWarmupConns)
for i, p := range m.Pools {
for _, p := range warmupPools {
p := p
for k := 0; k < warmupConnsPerPool[i]; k++ {
for i := 0; i < p.maxConns; i++ {
g.Go(func() error {
conn, err := p.Acquire(warmupCtx)
conn, err := p.pool.Acquire(warmupCtx)
if err != nil {
return err
}
Expand All @@ -317,8 +338,6 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error {
estConns := make([]*pgxpool.Conn, 0, numWarmupConns)
defer func() {
for _, conn := range estConns {
// NOTE(seanc@): Release() connections before canceling the warmupCtx to
// prevent partially established connections from being Acquire()'ed.
conn.Release()
}
}()
Expand Down

0 comments on commit 8e27836

Please sign in to comment.