Skip to content

Commit

Permalink
Merge #36913
Browse files Browse the repository at this point in the history
36913: workload/tpcc: limit parallelism for worker initialization r=RaduBerinde a=RaduBerinde

Add a semaphore to avoid initializing all workers in parallel - for
10k warehouses, we have 100k workers and that can lead to OOM.

Fixes #36897.

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Apr 18, 2019
2 parents 83de585 + dcc290d commit b47269e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err
// pool.
var g errgroup.Group
// Limit concurrent connection establishment. Allowing this to run
// at maximum parallism would trigger syn flood protection on the
// at maximum parallelism would trigger syn flood protection on the
// host, which combined with any packet loss could cause Acquire to
// return an error and fail the whole function. The value 100 is
// chosen because it is less than the default value for SOMAXCONN
Expand All @@ -102,8 +102,8 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err
conns := warmupConns[i]
for j := range conns {
j := j
sem <- struct{}{}
g.Go(func() error {
sem <- struct{}{}
var err error
conns[j], err = p.Acquire()
<-sem
Expand Down
5 changes: 5 additions & 0 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad,
ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
ql.WorkerFns = make([]func(context.Context) error, w.workers)
var group errgroup.Group
// Limit the amount of workers we initialize in parallel, to avoid running out
// of memory (#36897).
sem := make(chan struct{}, 100)
for workerIdx := range ql.WorkerFns {
workerIdx := workerIdx
warehouse := w.wPart.totalElems[workerIdx%len(w.wPart.totalElems)]
Expand All @@ -567,11 +570,13 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad,
dbs := partitionDBs[p]
db := dbs[warehouse%len(dbs)]

sem <- struct{}{}
group.Go(func() error {
worker, err := newWorker(context.TODO(), w, db, reg.GetHandle(), warehouse)
if err == nil {
ql.WorkerFns[workerIdx] = worker.run
}
<-sem
return err
})
}
Expand Down

0 comments on commit b47269e

Please sign in to comment.