Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gpool: register gpool into resource manager #40410

Merged
merged 8 commits into from
Jan 10, 2023
Merged
2 changes: 1 addition & 1 deletion resourcemanager/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) {
if cmd == scheduler.Hold {
return
}
if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond {
if time.Since(pool.Pool.LastTunerTs()) > util.MinSchedulerInterval.Load() {
con := pool.Pool.Cap()
switch cmd {
case scheduler.Downclock:
Expand Down
1 change: 0 additions & 1 deletion resourcemanager/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ go_library(
deps = [
"//resourcemanager/util",
"//util/cpu",
"@org_uber_go_atomic//:atomic",
],
)
2 changes: 1 addition & 1 deletion resourcemanager/scheduler/cpu_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewCPUScheduler() *CPUScheduler {

// Tune is to tune the goroutine pool
func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command {
if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() {
if time.Since(pool.LastTunerTs()) < util.MinSchedulerInterval.Load() {
return Hold
}
if cpu.GetCPUUsage() < 0.5 {
Expand Down
7 changes: 0 additions & 7 deletions resourcemanager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@
package scheduler

import (
"time"

"github.com/pingcap/tidb/resourcemanager/util"
"go.uber.org/atomic"
)

var (
minCPUSchedulerInterval = atomic.NewDuration(time.Minute)
)

// Command is the command for scheduler
Expand Down
5 changes: 4 additions & 1 deletion resourcemanager/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ go_library(
],
importpath = "github.com/pingcap/tidb/resourcemanager/util",
visibility = ["//visibility:public"],
deps = ["@com_github_pingcap_errors//:errors"],
deps = [
"@com_github_pingcap_errors//:errors",
"@org_uber_go_atomic//:atomic",
],
)

go_test(
Expand Down
4 changes: 2 additions & 2 deletions resourcemanager/util/mock_gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func NewMockGPool(name string) *MockGPool {
return &MockGPool{name: name}
}

// Release is only for test
func (*MockGPool) Release() {
// ReleaseAndWait is only for test
func (*MockGPool) ReleaseAndWait() {
panic("implement me")
}

Expand Down
23 changes: 11 additions & 12 deletions resourcemanager/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,23 @@

package util

import "time"
import (
"time"

"go.uber.org/atomic"
)

var (
// MinSchedulerInterval is the minimum interval between two scheduling.
MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond)
)

// GorotinuePool is a pool interface
type GorotinuePool interface {
Release()
ReleaseAndWait()
Tune(size int)
LastTunerTs() time.Time
MaxInFlight() int64
InFlight() int64
MinRT() uint64
MaxPASS() uint64
Cap() int
// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT.
LongRTT() float64
UpdateLongRTT(f func(float64) float64)
// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT.
ShortRTT() uint64
GetQueueSize() int64
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some API will be used for statistics. But it will be released in this sprint. so it is removed.

Running() int
Name() string
}
Expand Down
1 change: 1 addition & 0 deletions util/gpool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ go_library(
],
importpath = "github.com/pingcap/tidb/util/gpool",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
21 changes: 18 additions & 3 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"errors"
"sync/atomic"
"time"

atomicutil "go.uber.org/atomic"
)

const (
Expand All @@ -44,13 +46,16 @@ var (

// BasePool is base class of pool
type BasePool struct {
name string
generator atomic.Uint64
name string
lastTuneTs atomicutil.Time
generator atomic.Uint64
}

// NewBasePool is to create a new BasePool.
func NewBasePool() BasePool {
return BasePool{}
return BasePool{
lastTuneTs: *atomicutil.NewTime(time.Now()),
}
}

// SetName is to set name.
Expand All @@ -67,3 +72,13 @@ func (p *BasePool) Name() string {
func (p *BasePool) NewTaskID() uint64 {
return p.generator.Add(1)
}

// LastTunerTs returns the last time when the pool was tuned.
func (p *BasePool) LastTunerTs() time.Time {
Copy link
Member Author

@hawkingrei hawkingrei Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used in the scheduler to avoid scheduling frequently.

return p.lastTuneTs.Load()
}

// SetLastTuneTs sets the last time when the pool was tuned.
func (p *BasePool) SetLastTuneTs(t time.Time) {
p.lastTuneTs.Store(t)
}
3 changes: 3 additions & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ go_library(
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//util/gpool",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
Expand All @@ -33,6 +35,7 @@ go_test(
race = "on",
deps = [
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//testkit/testsetup",
"//util",
"//util/gpool",
Expand Down
9 changes: 8 additions & 1 deletion util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/pooltask"
"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/gpool"
"github.com/pingcap/tidb/util/logutil"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -55,7 +57,7 @@ type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct {
}

// NewSPMCPool create a single producer, multiple consumer goroutine pool.
func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, options ...Option) (*Pool[T, U, C, CT, TF], error) {
func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, component util.Component, options ...Option) (*Pool[T, U, C, CT, TF], error) {
opts := loadOptions(options...)
if expiry := opts.ExpiryDuration; expiry <= 0 {
opts.ExpiryDuration = gpool.DefaultCleanIntervalTime
Expand All @@ -77,6 +79,10 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
result.capacity.Add(size)
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
err := resourcemanager.GlobalResourceManager.Register(result, name, component)
if err != nil {
return nil, err
}
// Start a goroutine to clean up expired workers periodically.
go result.purgePeriodically()
return result, nil
Expand Down Expand Up @@ -129,6 +135,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
if capacity == -1 || size <= 0 || size == capacity {
return
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
// boost
Expand Down
3 changes: 2 additions & 1 deletion util/gpool/spmc/spmcpool_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/resourcemanager/pooltask"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gpool"
)
Expand All @@ -29,7 +30,7 @@ const (
)

func BenchmarkGPool(b *testing.B) {
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("test", 10)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("test", 10, rmutil.UNKNOWN)
if err != nil {
b.Fatal(err)
}
Expand Down
12 changes: 7 additions & 5 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/pingcap/tidb/resourcemanager/pooltask"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gpool"
"github.com/stretchr/testify/require"
Expand All @@ -32,7 +33,7 @@ func TestPool(t *testing.T) {
myArgs := ConstArgs{a: 10}
// init the pool
// input type, output type, constArgs type
pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10)
pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10, rmutil.UNKNOWN)
require.NoError(t, err)
pool.SetConsumerFunc(func(task int, constArgs ConstArgs, ctx any) int {
return task + constArgs.a
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestPoolWithEnoughCapacity(t *testing.T) {
poolsize = 30
concurrency = 6
)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithEnoughCapa", poolsize, WithExpiryDuration(DefaultExpiredTime))
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithEnoughCapa", poolsize, rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime))
require.NoError(t, err)
defer p.ReleaseAndWait()
p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} {
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestPoolWithoutEnoughCapacity(t *testing.T) {
concurrency = 2
poolsize = 2
)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize,
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapacity", poolsize, rmutil.UNKNOWN,
WithExpiryDuration(DefaultExpiredTime))
require.NoError(t, err)
defer p.ReleaseAndWait()
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) {
concurrency = 2
poolsize = 2
)
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapa", poolsize,
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestPoolWithoutEnoughCapacityParallel", poolsize, rmutil.UNKNOWN,
WithExpiryDuration(DefaultExpiredTime), WithNonblocking(true))
require.NoError(t, err)
defer p.ReleaseAndWait()
Expand Down Expand Up @@ -236,7 +237,8 @@ func TestPoolWithoutEnoughCapacityParallel(t *testing.T) {
}

func TestBenchPool(t *testing.T) {
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestBenchPool", 10, WithExpiryDuration(DefaultExpiredTime))
p, err := NewSPMCPool[struct{}, struct{}, int, any, pooltask.NilContext]("TestBenchPool", 10,
rmutil.UNKNOWN, WithExpiryDuration(DefaultExpiredTime))
require.NoError(t, err)
defer p.ReleaseAndWait()
p.SetConsumerFunc(func(a struct{}, b int, c any) struct{} {
Expand Down