Skip to content

Commit

Permalink
pkg/util: Add concurrent package
Browse files Browse the repository at this point in the history
`concurrent` package implements various primitives
around asynchrnonous execution.

`concurrent.Executor` defines an executor that can
execute functions.

Of course, Go has a perfectly good `go func()` mechanism
to execute concurrent code.  However, sometimes the caller
wants to avoid spinning up many Go routines in short burst.
Doing so tents to negatively impact Go runtime, and cause
spikes in latency.  `concurrent.NewWorkQueue` implements
a mechanism whereby the caller may create a work queue --
a queue of closures -- that will run on a bounded
number of worker goroutines.

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 7, 2023
1 parent 64fef1a commit 3e144d7
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ ALL_TESTS = [
"//pkg/util/cgroups:cgroups_test",
"//pkg/util/circuit:circuit_test",
"//pkg/util/cloudinfo:cloudinfo_test",
"//pkg/util/concurrent:concurrent_test",
"//pkg/util/contextutil:contextutil_test",
"//pkg/util/ctxgroup:ctxgroup_test",
"//pkg/util/duration:duration_test",
Expand Down Expand Up @@ -2123,6 +2124,8 @@ GO_TARGETS = [
"//pkg/util/cloudinfo:cloudinfo",
"//pkg/util/cloudinfo:cloudinfo_test",
"//pkg/util/collatedstring:collatedstring",
"//pkg/util/concurrent:concurrent",
"//pkg/util/concurrent:concurrent_test",
"//pkg/util/contextutil:contextutil",
"//pkg/util/contextutil:contextutil_test",
"//pkg/util/ctxgroup:ctxgroup",
Expand Down Expand Up @@ -3257,6 +3260,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/circuit:get_x_data",
"//pkg/util/cloudinfo:get_x_data",
"//pkg/util/collatedstring:get_x_data",
"//pkg/util/concurrent:get_x_data",
"//pkg/util/contextutil:get_x_data",
"//pkg/util/ctxgroup:get_x_data",
"//pkg/util/duration:get_x_data",
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/concurrent/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "concurrent",
srcs = ["executor.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/concurrent",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/queue",
"//pkg/util/stop",
"//pkg/util/syncutil",
],
)

go_test(
name = "concurrent_test",
srcs = ["executor_test.go"],
args = ["-test.timeout=295s"],
deps = [
":concurrent",
"//pkg/util/leaktest",
"//pkg/util/stop",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
180 changes: 180 additions & 0 deletions pkg/util/concurrent/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package concurrent

import (
"context"
"fmt"
"runtime/pprof"
"sync"

"github.com/cockroachdb/cockroach/pkg/util/queue"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Executor is an interface describing asynchronous
// function execution -- much like go keyword.
// Executor does not provide any execution order guarantees.
type Executor interface {
// GoCtx executes provided function with the specified context.
GoCtx(ctx context.Context, f func(ctx context.Context))
}

// DefaultExecutor is the default executor which simply
// executes functions on a new go routine.
var DefaultExecutor = sysExecutor{}

// Option is an Executor configuration option.
type Option interface {
apply(cfg *config)
}

// NewWorkQueue returns an Executor that queues work items.
//
// Work queue is suitable in cases where the caller has potentially many
// blocking operations that it wants to perform asynchronously.
// If this sounds like a "go func(){...}" -- it is. The problem here is that
// if the number of such go functions is very large, and happen in bursts,
// the creation of many go routines will put load on Go runtime, and this
// in turn causes elevated latencies for other go routines (i.e there is
// an impact on foreground SQL latency).
//
// To address unbounded burst of goroutine creation, we could instead utilize
// fixed number of workers to bound the parallelism.
//
// Work queue implements a standard Go fan-out pattern. It differs from the
// standard implementation in that it does not rely on the bounded buffered
// channels to submit the work to the executors. Instead, it has a queue, which
// fronts one or more executor workers.
//
// If the number of workers is not set, defaults to a single worker.
//
// All closures submitted to this queue will execute at some point. However, if
// the stopper is signaled to quiesce, those closures will execute with their
// context canceled.
func NewWorkQueue(
ctx context.Context, name string, stopper *stop.Stopper, opts ...Option,
) (Executor, error) {
var cfg config
for _, o := range opts {
o.apply(&cfg)
}

if cfg.numWorkers <= 0 {
cfg.numWorkers = 1
}

wq := workQueue{withCancelOnQuiesce: stopper.WithCancelOnQuiesce}
wq.mu.cv = sync.NewCond(&wq.mu.Mutex)
wq.wg.Add(cfg.numWorkers)

for i := 0; i < cfg.numWorkers; i++ {
name := fmt.Sprintf("wq-%s-%d", name, i)
if err := stopper.RunAsyncTask(ctx, name, func(ctx context.Context) {
pprof.Do(ctx, pprof.Labels("name", name), wq.worker)
}); err != nil {
return nil, err
}
}

stopperName := fmt.Sprintf("wq-%s-stopper", name)
if err := stopper.RunAsyncTask(ctx, stopperName, func(ctx context.Context) {
defer pprof.SetGoroutineLabels(ctx)
pprof.SetGoroutineLabels(pprof.WithLabels(ctx, pprof.Labels("name", stopperName)))

<-stopper.ShouldQuiesce()
wq.stop()
}); err != nil {
return nil, err
}
return &wq, nil
}

// WithNumWorkers returns an option to configure the number of workers used to
// process callbacks.
func WithNumWorkers(n int) Option {
return funcOpt(func(cfg *config) {
cfg.numWorkers = n
})
}

type workQueue struct {
wg sync.WaitGroup
withCancelOnQuiesce func(ctx context.Context) (context.Context, func())

mu struct {
syncutil.Mutex
cv *sync.Cond
stop bool
work queue.Queue[func()]
}
}

// GoCtx implements Executor interface.
func (q *workQueue) GoCtx(ctx context.Context, f func(ctx context.Context)) {
ctx, _ = q.withCancelOnQuiesce(ctx)
q.mu.Lock()
q.mu.work.Push(func() { f(ctx) })
q.mu.cv.Signal()
q.mu.Unlock()
}

func (q *workQueue) stop() {
q.mu.Lock()
q.mu.work.Pop()
q.mu.stop = true
q.mu.cv.Broadcast()
q.mu.Unlock()
q.wg.Wait()
}

func (q *workQueue) worker(ctx context.Context) {
defer q.wg.Done()

for {
q.mu.Lock()
var fn func()
for fn == nil {
var ok bool
fn, ok = q.mu.work.PopFront()
if ok {
break
}
if q.mu.stop {
q.mu.Unlock()
return
}
q.mu.cv.Wait()
}
q.mu.Unlock()

fn()
}
}

type funcOpt func(cfg *config)

func (f funcOpt) apply(cfg *config) {
f(cfg)
}

type config struct {
numWorkers int
}

type sysExecutor struct{}

// GoCtx implements Executor interface.
func (sysExecutor) GoCtx(ctx context.Context, f func(ctx context.Context)) {
go func() { f(ctx) }()
}
109 changes: 109 additions & 0 deletions pkg/util/concurrent/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package concurrent_test

import (
"context"
"fmt"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/util/concurrent"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

func TestDefaultExecutor(t *testing.T) {
defer leaktest.AfterTest(t)()

const numClosures = 1024
testCh := make(chan struct{})

var wg sync.WaitGroup
wg.Add(numClosures)
for i := 0; i < numClosures; i++ {
concurrent.DefaultExecutor.GoCtx(context.Background(), func(ctx context.Context) {
<-testCh
wg.Done()
})
}
close(testCh)
wg.Wait()
}

func TestWorkQueue(t *testing.T) {
defer leaktest.AfterTest(t)()

const numClosures = 1024
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)

for n := 0; n <= 16; n++ {
t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) {
testCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(numClosures)

ex, err := concurrent.NewWorkQueue(ctx, fmt.Sprintf("n%d", n), stopper, concurrent.WithNumWorkers(n))
require.NoError(t, err)

for i := 0; i < numClosures; i++ {
ex.GoCtx(context.Background(), func(ctx context.Context) {
<-testCh
wg.Done()
})
}
close(testCh)
wg.Wait()
})
}
}

func BenchmarkExecutors(b *testing.B) {
stopper := stop.NewStopper()
ctx := context.Background()
defer stopper.Stop(ctx)

for numWorkers := 0; numWorkers <= 64; {
var ex concurrent.Executor
var benchName string
if numWorkers == 0 {
ex = concurrent.DefaultExecutor
benchName = "DefaultExecutor"
numWorkers++
} else {
var err error
benchName = fmt.Sprintf("WQ=%d", numWorkers)
ex, err = concurrent.NewWorkQueue(ctx, benchName, stopper, concurrent.WithNumWorkers(numWorkers))
if err != nil {
b.Fatal(err)
}
numWorkers *= 2
}

b.ResetTimer()
b.Run(benchName, func(b *testing.B) {
testCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)

for n := 0; n < b.N; n++ {
ex.GoCtx(context.Background(), func(ctx context.Context) {
<-testCh
wg.Done()
})
}
close(testCh)
wg.Wait()
})
}
}

0 comments on commit 3e144d7

Please sign in to comment.