From 3e144d750f4d98fda7c0b47bc72f00d55dad6428 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 7 Apr 2023 12:21:36 -0400 Subject: [PATCH] pkg/util: Add concurrent package `concurrent` package implements various primitives around asynchrnonous execution. `concurrent.Executor` defines an executor that can execute functions. Of course, Go has a perfectly good `go func()` mechanism to execute concurrent code. However, sometimes the caller wants to avoid spinning up many Go routines in short burst. Doing so tents to negatively impact Go runtime, and cause spikes in latency. `concurrent.NewWorkQueue` implements a mechanism whereby the caller may create a work queue -- a queue of closures -- that will run on a bounded number of worker goroutines. Release note: None --- pkg/BUILD.bazel | 4 + pkg/util/concurrent/BUILD.bazel | 28 +++++ pkg/util/concurrent/executor.go | 180 +++++++++++++++++++++++++++ pkg/util/concurrent/executor_test.go | 109 ++++++++++++++++ 4 files changed, 321 insertions(+) create mode 100644 pkg/util/concurrent/BUILD.bazel create mode 100644 pkg/util/concurrent/executor.go create mode 100644 pkg/util/concurrent/executor_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 1adda511c538..315d0c3f45f3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -594,6 +594,7 @@ ALL_TESTS = [ "//pkg/util/cgroups:cgroups_test", "//pkg/util/circuit:circuit_test", "//pkg/util/cloudinfo:cloudinfo_test", + "//pkg/util/concurrent:concurrent_test", "//pkg/util/contextutil:contextutil_test", "//pkg/util/ctxgroup:ctxgroup_test", "//pkg/util/duration:duration_test", @@ -2123,6 +2124,8 @@ GO_TARGETS = [ "//pkg/util/cloudinfo:cloudinfo", "//pkg/util/cloudinfo:cloudinfo_test", "//pkg/util/collatedstring:collatedstring", + "//pkg/util/concurrent:concurrent", + "//pkg/util/concurrent:concurrent_test", "//pkg/util/contextutil:contextutil", "//pkg/util/contextutil:contextutil_test", "//pkg/util/ctxgroup:ctxgroup", @@ -3257,6 +3260,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/circuit:get_x_data", "//pkg/util/cloudinfo:get_x_data", "//pkg/util/collatedstring:get_x_data", + "//pkg/util/concurrent:get_x_data", "//pkg/util/contextutil:get_x_data", "//pkg/util/ctxgroup:get_x_data", "//pkg/util/duration:get_x_data", diff --git a/pkg/util/concurrent/BUILD.bazel b/pkg/util/concurrent/BUILD.bazel new file mode 100644 index 000000000000..ec24781e9369 --- /dev/null +++ b/pkg/util/concurrent/BUILD.bazel @@ -0,0 +1,28 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "concurrent", + srcs = ["executor.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/concurrent", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/queue", + "//pkg/util/stop", + "//pkg/util/syncutil", + ], +) + +go_test( + name = "concurrent_test", + srcs = ["executor_test.go"], + args = ["-test.timeout=295s"], + deps = [ + ":concurrent", + "//pkg/util/leaktest", + "//pkg/util/stop", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/concurrent/executor.go b/pkg/util/concurrent/executor.go new file mode 100644 index 000000000000..753f929449b7 --- /dev/null +++ b/pkg/util/concurrent/executor.go @@ -0,0 +1,180 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// + +package concurrent + +import ( + "context" + "fmt" + "runtime/pprof" + "sync" + + "github.com/cockroachdb/cockroach/pkg/util/queue" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// Executor is an interface describing asynchronous +// function execution -- much like go keyword. +// Executor does not provide any execution order guarantees. +type Executor interface { + // GoCtx executes provided function with the specified context. + GoCtx(ctx context.Context, f func(ctx context.Context)) +} + +// DefaultExecutor is the default executor which simply +// executes functions on a new go routine. +var DefaultExecutor = sysExecutor{} + +// Option is an Executor configuration option. +type Option interface { + apply(cfg *config) +} + +// NewWorkQueue returns an Executor that queues work items. +// +// Work queue is suitable in cases where the caller has potentially many +// blocking operations that it wants to perform asynchronously. +// If this sounds like a "go func(){...}" -- it is. The problem here is that +// if the number of such go functions is very large, and happen in bursts, +// the creation of many go routines will put load on Go runtime, and this +// in turn causes elevated latencies for other go routines (i.e there is +// an impact on foreground SQL latency). +// +// To address unbounded burst of goroutine creation, we could instead utilize +// fixed number of workers to bound the parallelism. +// +// Work queue implements a standard Go fan-out pattern. It differs from the +// standard implementation in that it does not rely on the bounded buffered +// channels to submit the work to the executors. Instead, it has a queue, which +// fronts one or more executor workers. +// +// If the number of workers is not set, defaults to a single worker. +// +// All closures submitted to this queue will execute at some point. However, if +// the stopper is signaled to quiesce, those closures will execute with their +// context canceled. +func NewWorkQueue( + ctx context.Context, name string, stopper *stop.Stopper, opts ...Option, +) (Executor, error) { + var cfg config + for _, o := range opts { + o.apply(&cfg) + } + + if cfg.numWorkers <= 0 { + cfg.numWorkers = 1 + } + + wq := workQueue{withCancelOnQuiesce: stopper.WithCancelOnQuiesce} + wq.mu.cv = sync.NewCond(&wq.mu.Mutex) + wq.wg.Add(cfg.numWorkers) + + for i := 0; i < cfg.numWorkers; i++ { + name := fmt.Sprintf("wq-%s-%d", name, i) + if err := stopper.RunAsyncTask(ctx, name, func(ctx context.Context) { + pprof.Do(ctx, pprof.Labels("name", name), wq.worker) + }); err != nil { + return nil, err + } + } + + stopperName := fmt.Sprintf("wq-%s-stopper", name) + if err := stopper.RunAsyncTask(ctx, stopperName, func(ctx context.Context) { + defer pprof.SetGoroutineLabels(ctx) + pprof.SetGoroutineLabels(pprof.WithLabels(ctx, pprof.Labels("name", stopperName))) + + <-stopper.ShouldQuiesce() + wq.stop() + }); err != nil { + return nil, err + } + return &wq, nil +} + +// WithNumWorkers returns an option to configure the number of workers used to +// process callbacks. +func WithNumWorkers(n int) Option { + return funcOpt(func(cfg *config) { + cfg.numWorkers = n + }) +} + +type workQueue struct { + wg sync.WaitGroup + withCancelOnQuiesce func(ctx context.Context) (context.Context, func()) + + mu struct { + syncutil.Mutex + cv *sync.Cond + stop bool + work queue.Queue[func()] + } +} + +// GoCtx implements Executor interface. +func (q *workQueue) GoCtx(ctx context.Context, f func(ctx context.Context)) { + ctx, _ = q.withCancelOnQuiesce(ctx) + q.mu.Lock() + q.mu.work.Push(func() { f(ctx) }) + q.mu.cv.Signal() + q.mu.Unlock() +} + +func (q *workQueue) stop() { + q.mu.Lock() + q.mu.work.Pop() + q.mu.stop = true + q.mu.cv.Broadcast() + q.mu.Unlock() + q.wg.Wait() +} + +func (q *workQueue) worker(ctx context.Context) { + defer q.wg.Done() + + for { + q.mu.Lock() + var fn func() + for fn == nil { + var ok bool + fn, ok = q.mu.work.PopFront() + if ok { + break + } + if q.mu.stop { + q.mu.Unlock() + return + } + q.mu.cv.Wait() + } + q.mu.Unlock() + + fn() + } +} + +type funcOpt func(cfg *config) + +func (f funcOpt) apply(cfg *config) { + f(cfg) +} + +type config struct { + numWorkers int +} + +type sysExecutor struct{} + +// GoCtx implements Executor interface. +func (sysExecutor) GoCtx(ctx context.Context, f func(ctx context.Context)) { + go func() { f(ctx) }() +} diff --git a/pkg/util/concurrent/executor_test.go b/pkg/util/concurrent/executor_test.go new file mode 100644 index 000000000000..5b5fbe8de142 --- /dev/null +++ b/pkg/util/concurrent/executor_test.go @@ -0,0 +1,109 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package concurrent_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/concurrent" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +func TestDefaultExecutor(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numClosures = 1024 + testCh := make(chan struct{}) + + var wg sync.WaitGroup + wg.Add(numClosures) + for i := 0; i < numClosures; i++ { + concurrent.DefaultExecutor.GoCtx(context.Background(), func(ctx context.Context) { + <-testCh + wg.Done() + }) + } + close(testCh) + wg.Wait() +} + +func TestWorkQueue(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numClosures = 1024 + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + + for n := 0; n <= 16; n++ { + t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { + testCh := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(numClosures) + + ex, err := concurrent.NewWorkQueue(ctx, fmt.Sprintf("n%d", n), stopper, concurrent.WithNumWorkers(n)) + require.NoError(t, err) + + for i := 0; i < numClosures; i++ { + ex.GoCtx(context.Background(), func(ctx context.Context) { + <-testCh + wg.Done() + }) + } + close(testCh) + wg.Wait() + }) + } +} + +func BenchmarkExecutors(b *testing.B) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + + for numWorkers := 0; numWorkers <= 64; { + var ex concurrent.Executor + var benchName string + if numWorkers == 0 { + ex = concurrent.DefaultExecutor + benchName = "DefaultExecutor" + numWorkers++ + } else { + var err error + benchName = fmt.Sprintf("WQ=%d", numWorkers) + ex, err = concurrent.NewWorkQueue(ctx, benchName, stopper, concurrent.WithNumWorkers(numWorkers)) + if err != nil { + b.Fatal(err) + } + numWorkers *= 2 + } + + b.ResetTimer() + b.Run(benchName, func(b *testing.B) { + testCh := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(b.N) + + for n := 0; n < b.N; n++ { + ex.GoCtx(context.Background(), func(ctx context.Context) { + <-testCh + wg.Done() + }) + } + close(testCh) + wg.Wait() + }) + } +}