From 74e072da45c67516063b3a95197ce88286bad7e9 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Jan 2023 14:26:45 +0800 Subject: [PATCH 1/6] gpool: register gpool into resource manager Signed-off-by: Weizhen Wang --- resourcemanager/util/util.go | 12 +----------- util/gpool/BUILD.bazel | 1 + util/gpool/gpool.go | 21 ++++++++++++++++++--- util/gpool/spmc/BUILD.bazel | 3 +++ util/gpool/spmc/spmcpool.go | 9 ++++++++- util/gpool/spmc/spmcpool_benchmark_test.go | 3 ++- util/gpool/spmc/spmcpool_test.go | 12 +++++++----- 7 files changed, 40 insertions(+), 21 deletions(-) diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go index d5b988c344295..49a7d68bb58fd 100644 --- a/resourcemanager/util/util.go +++ b/resourcemanager/util/util.go @@ -18,20 +18,10 @@ import "time" // GorotinuePool is a pool interface type GorotinuePool interface { - Release() + ReleaseAndWait() Tune(size int) LastTunerTs() time.Time - MaxInFlight() int64 - InFlight() int64 - MinRT() uint64 - MaxPASS() uint64 Cap() int - // LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. - LongRTT() float64 - UpdateLongRTT(f func(float64) float64) - // ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. - ShortRTT() uint64 - GetQueueSize() int64 Running() int Name() string } diff --git a/util/gpool/BUILD.bazel b/util/gpool/BUILD.bazel index 04a3dc25e7cd0..4f9eb753be57a 100644 --- a/util/gpool/BUILD.bazel +++ b/util/gpool/BUILD.bazel @@ -8,4 +8,5 @@ go_library( ], importpath = "github.com/pingcap/tidb/util/gpool", visibility = ["//visibility:public"], + deps = ["@org_uber_go_atomic//:atomic"], ) diff --git a/util/gpool/gpool.go b/util/gpool/gpool.go index 7611d29542a31..bd65eaca9f505 100644 --- a/util/gpool/gpool.go +++ b/util/gpool/gpool.go @@ -18,6 +18,8 @@ import ( "errors" "sync/atomic" "time" + + atomicutil "go.uber.org/atomic" ) const ( @@ -44,13 +46,16 @@ var ( // BasePool is base class of pool type BasePool struct { - name string - generator atomic.Uint64 + name string + lastTuneTs atomicutil.Time + generator atomic.Uint64 } // NewBasePool is to create a new BasePool. func NewBasePool() BasePool { - return BasePool{} + return BasePool{ + lastTuneTs: *atomicutil.NewTime(time.Now()), + } } // SetName is to set name. @@ -67,3 +72,13 @@ func (p *BasePool) Name() string { func (p *BasePool) NewTaskID() uint64 { return p.generator.Add(1) } + +// LastTunerTs returns the last time when the pool was tuned. +func (p *BasePool) LastTunerTs() time.Time { + return p.lastTuneTs.Load() +} + +// SetLastTuneTs sets the last time when the pool was tuned. +func (p *BasePool) SetLastTuneTs(t time.Time) { + p.lastTuneTs.Store(t) +} diff --git a/util/gpool/spmc/BUILD.bazel b/util/gpool/spmc/BUILD.bazel index db48d9771cb17..1c951a219fb20 100644 --- a/util/gpool/spmc/BUILD.bazel +++ b/util/gpool/spmc/BUILD.bazel @@ -11,7 +11,9 @@ go_library( importpath = "github.com/pingcap/tidb/util/gpool/spmc", visibility = ["//visibility:public"], deps = [ + "//resourcemanager", "//resourcemanager/pooltask", + "//resourcemanager/util", "//util/gpool", "//util/logutil", "@com_github_pingcap_errors//:errors", @@ -33,6 +35,7 @@ go_test( race = "on", deps = [ "//resourcemanager/pooltask", + "//resourcemanager/util", "//testkit/testsetup", "//util", "//util/gpool", diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index b69c7a05e0eca..9690db85b0af7 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -21,7 +21,9 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/resourcemanager" "github.com/pingcap/tidb/resourcemanager/pooltask" + "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/util/gpool" "github.com/pingcap/tidb/util/logutil" atomicutil "go.uber.org/atomic" @@ -55,7 +57,7 @@ type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct { } // NewSPMCPool create a single producer, multiple consumer goroutine pool. -func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, options ...Option) (*Pool[T, U, C, CT, TF], error) { +func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, component util.Component, options ...Option) (*Pool[T, U, C, CT, TF], error) { opts := loadOptions(options...) if expiry := opts.ExpiryDuration; expiry <= 0 { opts.ExpiryDuration = gpool.DefaultCleanIntervalTime @@ -77,6 +79,10 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri result.capacity.Add(size) result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size)) result.cond = sync.NewCond(result.lock) + err := resourcemanager.GlobalResourceManager.Register(result, name, component) + if err != nil { + return nil, err + } // Start a goroutine to clean up expired workers periodically. go result.purgePeriodically() return result, nil @@ -129,6 +135,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) { if capacity == -1 || size <= 0 || size == capacity { return } + p.SetLastTuneTs(time.Now().Add(p.options.LimitDuration)) p.capacity.Store(int32(size)) if size > capacity { // boost diff --git a/util/gpool/spmc/spmcpool_benchmark_test.go b/util/gpool/spmc/spmcpool_benchmark_test.go index db3a4f0824e78..2d2f39511ea1c 100644 --- a/util/gpool/spmc/spmcpool_benchmark_test.go +++ b/util/gpool/spmc/spmcpool_benchmark_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/tidb/resourcemanager/pooltask" + rmutil "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/gpool" ) @@ -29,7 +30,7 @@ const ( ) func BenchmarkGPool(b *testing.B) { - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("test", 10) + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("test", 10, rmutil.UNKNOWN) if err != nil { b.Fatal(err) } diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 984f501789c47..20c64658fef7a 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/pingcap/tidb/resourcemanager/pooltask" + rmutil "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/gpool" "github.com/stretchr/testify/require" @@ -32,7 +33,7 @@ func TestPool(t *testing.T) { myArgs := ConstArgs{a: 10} // init the pool // input type, output type, constArgs type - pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10) + pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10, rmutil.UNKNOWN) require.NoError(t, err) pool.SetConsumerFunc(func(task int, constArgs ConstArgs, ctx any) int { return task + constArgs.a @@ -76,7 +77,7 @@ func TestPoolWithEnoughCapacity(t *testing.T) { poolsize = 30 concurrency = 6 ) - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithEnoughCapa", poolsize, WithExpiryDuration(DefaultExpiredTime)) + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithEnoughCapa", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime)) require.NoError(t, err) defer p.ReleaseAndWait() p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} { @@ -128,7 +129,7 @@ func TestPoolWithoutEnoughCapacity(t *testing.T) { concurrency = 2 poolsize = 2 ) - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize, + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime)) require.NoError(t, err) defer p.ReleaseAndWait() @@ -184,7 +185,7 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) { concurrency = 2 poolsize = 2 ) - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize, + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime), WithNonblocking(true)) require.NoError(t, err) defer p.ReleaseAndWait() @@ -236,7 +237,8 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) { } func TestBenchPool(t *testing.T) { - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestBenchPool", 10, WithExpiryDuration(DefaultExpiredTime)) + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestBenchPool", 10, + rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime)) require.NoError(t, err) defer p.ReleaseAndWait() p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} { From c02bc71450b8af0d1381d0b2e07a1c7815b718cb Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Jan 2023 14:37:29 +0800 Subject: [PATCH 2/6] gpool: register gpool into resource manager Signed-off-by: Weizhen Wang --- resourcemanager/util/mock_gpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resourcemanager/util/mock_gpool.go b/resourcemanager/util/mock_gpool.go index b9e66dd9afeab..3f87c96a0cad3 100644 --- a/resourcemanager/util/mock_gpool.go +++ b/resourcemanager/util/mock_gpool.go @@ -27,7 +27,7 @@ func NewMockGPool(name string) *MockGPool { } // Release is only for test -func (*MockGPool) Release() { +func (*MockGPool) ReleaseAndWait() { panic("implement me") } From 49775156ac0bd58120166c8b03da7fd0e101c6d5 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Jan 2023 14:41:50 +0800 Subject: [PATCH 3/6] gpool: register gpool into resource manager Signed-off-by: Weizhen Wang --- resourcemanager/util/mock_gpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resourcemanager/util/mock_gpool.go b/resourcemanager/util/mock_gpool.go index 3f87c96a0cad3..9697d2942d6ee 100644 --- a/resourcemanager/util/mock_gpool.go +++ b/resourcemanager/util/mock_gpool.go @@ -26,7 +26,7 @@ func NewMockGPool(name string) *MockGPool { return &MockGPool{name: name} } -// Release is only for test +// ReleaseAndWait is only for test func (*MockGPool) ReleaseAndWait() { panic("implement me") } From 3aba102b055dcb295b2f572755194a6ad8899965 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Jan 2023 14:47:33 +0800 Subject: [PATCH 4/6] gpool: register gpool into resource manager Signed-off-by: Weizhen Wang --- util/gpool/spmc/spmcpool_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 20c64658fef7a..bc9a197815ad7 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -129,7 +129,7 @@ func TestPoolWithoutEnoughCapacity(t *testing.T) { concurrency = 2 poolsize = 2 ) - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize, rmutil.UNKNOWN, + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapacity", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime)) require.NoError(t, err) defer p.ReleaseAndWait() @@ -185,7 +185,7 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) { concurrency = 2 poolsize = 2 ) - p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize, rmutil.UNKNOWN, + p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapacityParallel", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime), WithNonblocking(true)) require.NoError(t, err) defer p.ReleaseAndWait() From 205a7240a5ab45906b3a9b5c8ff1c2151fe0762d Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Jan 2023 15:27:40 +0800 Subject: [PATCH 5/6] gpool: register gpool into resource manager Signed-off-by: Weizhen Wang --- resourcemanager/schedule.go | 2 +- resourcemanager/scheduler/BUILD.bazel | 1 - resourcemanager/scheduler/cpu_scheduler.go | 2 +- resourcemanager/scheduler/scheduler.go | 7 ------- resourcemanager/util/BUILD.bazel | 5 ++++- resourcemanager/util/util.go | 10 +++++++++- util/gpool/spmc/spmcpool.go | 2 +- 7 files changed, 16 insertions(+), 13 deletions(-) diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go index 41560eed5c2a4..f6ac691e09b15 100644 --- a/resourcemanager/schedule.go +++ b/resourcemanager/schedule.go @@ -47,7 +47,7 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { if cmd == scheduler.Hold { return } - if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond { + if time.Since(pool.Pool.LastTunerTs()) > util.MinSchedulerInterval.Load() { con := pool.Pool.Cap() switch cmd { case scheduler.Downclock: diff --git a/resourcemanager/scheduler/BUILD.bazel b/resourcemanager/scheduler/BUILD.bazel index 5dc17e8412d17..39bd88f030372 100644 --- a/resourcemanager/scheduler/BUILD.bazel +++ b/resourcemanager/scheduler/BUILD.bazel @@ -11,6 +11,5 @@ go_library( deps = [ "//resourcemanager/util", "//util/cpu", - "@org_uber_go_atomic//:atomic", ], ) diff --git a/resourcemanager/scheduler/cpu_scheduler.go b/resourcemanager/scheduler/cpu_scheduler.go index 7d0bdf1d31a07..217c5aecbf1dd 100644 --- a/resourcemanager/scheduler/cpu_scheduler.go +++ b/resourcemanager/scheduler/cpu_scheduler.go @@ -31,7 +31,7 @@ func NewCPUScheduler() *CPUScheduler { // Tune is to tune the goroutine pool func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command { - if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() { + if time.Since(pool.LastTunerTs()) < util.MinSchedulerInterval.Load() { return Hold } if cpu.GetCPUUsage() < 0.5 { diff --git a/resourcemanager/scheduler/scheduler.go b/resourcemanager/scheduler/scheduler.go index 6cba0e18923cc..3af8e6aff5b0b 100644 --- a/resourcemanager/scheduler/scheduler.go +++ b/resourcemanager/scheduler/scheduler.go @@ -15,14 +15,7 @@ package scheduler import ( - "time" - "github.com/pingcap/tidb/resourcemanager/util" - "go.uber.org/atomic" -) - -var ( - minCPUSchedulerInterval = atomic.NewDuration(time.Minute) ) // Command is the command for scheduler diff --git a/resourcemanager/util/BUILD.bazel b/resourcemanager/util/BUILD.bazel index 7688b26a93d93..1c5396db6049b 100644 --- a/resourcemanager/util/BUILD.bazel +++ b/resourcemanager/util/BUILD.bazel @@ -9,7 +9,10 @@ go_library( ], importpath = "github.com/pingcap/tidb/resourcemanager/util", visibility = ["//visibility:public"], - deps = ["@com_github_pingcap_errors//:errors"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@org_uber_go_atomic//:atomic", + ], ) go_test( diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go index 49a7d68bb58fd..b7c9bd4211c17 100644 --- a/resourcemanager/util/util.go +++ b/resourcemanager/util/util.go @@ -14,7 +14,15 @@ package util -import "time" +import ( + "time" + + "go.uber.org/atomic" +) + +var ( + MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond) +) // GorotinuePool is a pool interface type GorotinuePool interface { diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 9690db85b0af7..0f81d86448828 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -135,7 +135,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) { if capacity == -1 || size <= 0 || size == capacity { return } - p.SetLastTuneTs(time.Now().Add(p.options.LimitDuration)) + p.SetLastTuneTs(time.Now()) p.capacity.Store(int32(size)) if size > capacity { // boost From d6239004e7ae7c626c6e080126360fb0261178aa Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 9 Jan 2023 15:31:20 +0800 Subject: [PATCH 6/6] gpool: register gpool into resource manager Signed-off-by: Weizhen Wang --- resourcemanager/util/util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go index b7c9bd4211c17..4d433975fabb7 100644 --- a/resourcemanager/util/util.go +++ b/resourcemanager/util/util.go @@ -21,6 +21,7 @@ import ( ) var ( + // MinSchedulerInterval is the minimum interval between two scheduling. MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond) )