-
Notifications
You must be signed in to change notification settings - Fork 5.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
14 changed files
with
1,739 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library") | ||
|
||
go_library( | ||
name = "pooltask", | ||
srcs = ["task.go"], | ||
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask", | ||
visibility = ["//visibility:public"], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
// Copyright 2022 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package pooltask | ||
|
||
import ( | ||
"sync" | ||
) | ||
|
||
// Context is a interface that can be used to create a context. | ||
type Context[T any] interface { | ||
GetContext() T | ||
} | ||
|
||
// NilContext is to create a nil as context | ||
type NilContext struct{} | ||
|
||
// GetContext is to get a nil as context | ||
func (NilContext) GetContext() any { | ||
return nil | ||
} | ||
|
||
// TaskBox is a box which contains all info about pooltask. | ||
type TaskBox[T any, U any, C any, CT any, TF Context[CT]] struct { | ||
constArgs C | ||
contextFunc TF | ||
wg *sync.WaitGroup | ||
task chan Task[T] | ||
resultCh chan U | ||
taskID uint64 | ||
} | ||
|
||
// NewTaskBox is to create a task box for pool. | ||
func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contextFunc TF, wg *sync.WaitGroup, taskCh chan Task[T], resultCh chan U, taskID uint64) TaskBox[T, U, C, CT, TF] { | ||
return TaskBox[T, U, C, CT, TF]{ | ||
constArgs: constArgs, | ||
contextFunc: contextFunc, | ||
wg: wg, | ||
task: taskCh, | ||
resultCh: resultCh, | ||
taskID: taskID, | ||
} | ||
} | ||
|
||
// TaskID is to get the task id. | ||
func (t TaskBox[T, U, C, CT, TF]) TaskID() uint64 { | ||
return t.taskID | ||
} | ||
|
||
// ConstArgs is to get the const args. | ||
func (t *TaskBox[T, U, C, CT, TF]) ConstArgs() C { | ||
return t.constArgs | ||
} | ||
|
||
// GetTaskCh is to get the task channel. | ||
func (t *TaskBox[T, U, C, CT, TF]) GetTaskCh() chan Task[T] { | ||
return t.task | ||
} | ||
|
||
// GetResultCh is to get result channel | ||
func (t *TaskBox[T, U, C, CT, TF]) GetResultCh() chan U { | ||
return t.resultCh | ||
} | ||
|
||
// GetContextFunc is to get context func. | ||
func (t *TaskBox[T, U, C, CT, TF]) GetContextFunc() TF { | ||
return t.contextFunc | ||
} | ||
|
||
// Done is to set the pooltask status to complete. | ||
func (t *TaskBox[T, U, C, CT, TF]) Done() { | ||
t.wg.Done() | ||
} | ||
|
||
// Clone is to copy the box | ||
func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] { | ||
newBox := NewTaskBox[T, U, C, CT, TF](t.constArgs, t.contextFunc, t.wg, t.task, t.resultCh, t.taskID) | ||
return &newBox | ||
} | ||
|
||
// GPool is a goroutine pool. | ||
type GPool[T any, U any, C any, CT any, TF Context[CT]] interface { | ||
Tune(size int) | ||
} | ||
|
||
// TaskController is a controller that can control or watch the pool. | ||
type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { | ||
pool GPool[T, U, C, CT, TF] | ||
close chan struct{} | ||
wg *sync.WaitGroup | ||
taskID uint64 | ||
resultCh chan U | ||
} | ||
|
||
// NewTaskController create a controller to deal with pooltask's status. | ||
func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh chan struct{}, wg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] { | ||
return TaskController[T, U, C, CT, TF]{ | ||
pool: p, | ||
taskID: taskID, | ||
close: closeCh, | ||
wg: wg, | ||
resultCh: resultCh, | ||
} | ||
} | ||
|
||
// Wait is to wait the pool task to stop. | ||
func (t *TaskController[T, U, C, CT, TF]) Wait() { | ||
<-t.close | ||
t.wg.Wait() | ||
close(t.resultCh) | ||
} | ||
|
||
// TaskID is to get the task id. | ||
func (t *TaskController[T, U, C, CT, TF]) TaskID() uint64 { | ||
return t.taskID | ||
} | ||
|
||
// Task is a task that can be executed. | ||
type Task[T any] struct { | ||
Task T | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library") | ||
|
||
go_library( | ||
name = "gpool", | ||
srcs = [ | ||
"gpool.go", | ||
"spinlock.go", | ||
], | ||
importpath = "github.com/pingcap/tidb/util/gpool", | ||
visibility = ["//visibility:public"], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
// Copyright 2022 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package gpool | ||
|
||
import ( | ||
"errors" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
const ( | ||
// DefaultCleanIntervalTime is the interval time to clean up goroutines. | ||
DefaultCleanIntervalTime = 5 * time.Second | ||
|
||
// OPENED represents that the pool is opened. | ||
OPENED = iota | ||
|
||
// CLOSED represents that the pool is closed. | ||
CLOSED | ||
) | ||
|
||
var ( | ||
// ErrPoolClosed will be returned when submitting task to a closed pool. | ||
ErrPoolClosed = errors.New("this pool has been closed") | ||
|
||
// ErrPoolOverload will be returned when the pool is full and no workers available. | ||
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") | ||
|
||
// ErrProducerClosed will be returned when the producer is closed. | ||
ErrProducerClosed = errors.New("this producer has been closed") | ||
) | ||
|
||
// BasePool is base class of pool | ||
type BasePool struct { | ||
name string | ||
generator atomic.Uint64 | ||
} | ||
|
||
// NewBasePool is to create a new BasePool. | ||
func NewBasePool() BasePool { | ||
return BasePool{} | ||
} | ||
|
||
// SetName is to set name. | ||
func (p *BasePool) SetName(name string) { | ||
p.name = name | ||
} | ||
|
||
// Name is to get name. | ||
func (p *BasePool) Name() string { | ||
return p.name | ||
} | ||
|
||
// NewTaskID is to get a new task ID. | ||
func (p *BasePool) NewTaskID() uint64 { | ||
return p.generator.Add(1) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Copyright 2022 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package gpool | ||
|
||
import ( | ||
"runtime" | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
type spinLock uint32 | ||
|
||
const maxBackoff = 16 | ||
|
||
func (sl *spinLock) Lock() { | ||
backoff := 1 | ||
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { | ||
// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff. | ||
for i := 0; i < backoff; i++ { | ||
runtime.Gosched() | ||
} | ||
if backoff < maxBackoff { | ||
backoff <<= 1 | ||
} | ||
} | ||
} | ||
|
||
func (sl *spinLock) Unlock() { | ||
atomic.StoreUint32((*uint32)(sl), 0) | ||
} | ||
|
||
// NewSpinLock instantiates a spin-lock. | ||
func NewSpinLock() sync.Locker { | ||
return new(spinLock) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "spmc", | ||
srcs = [ | ||
"option.go", | ||
"spmcpool.go", | ||
"worker.go", | ||
"worker_loop_queue.go", | ||
], | ||
importpath = "github.com/pingcap/tidb/util/gpool/spmc", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//resourcemanager/pooltask", | ||
"//util/gpool", | ||
"//util/logutil", | ||
"@com_github_pingcap_errors//:errors", | ||
"@com_github_pingcap_log//:log", | ||
"@org_uber_go_atomic//:atomic", | ||
"@org_uber_go_zap//:zap", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "spmc_test", | ||
srcs = [ | ||
"main_test.go", | ||
"spmcpool_benchmark_test.go", | ||
"spmcpool_test.go", | ||
"worker_loop_queue_test.go", | ||
], | ||
embed = [":spmc"], | ||
race = "on", | ||
deps = [ | ||
"//resourcemanager/pooltask", | ||
"//testkit/testsetup", | ||
"//util", | ||
"//util/gpool", | ||
"@com_github_stretchr_testify//require", | ||
"@org_uber_go_atomic//:atomic", | ||
"@org_uber_go_goleak//:goleak", | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
// Copyright 2022 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package spmc | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/pingcap/tidb/testkit/testsetup" | ||
"go.uber.org/goleak" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
testsetup.SetupForCommonTest() | ||
goleak.VerifyTestMain(m) | ||
} |
Oops, something went wrong.