Skip to content

Commit

Permalink
*: remove goroutine pool (#7564)
Browse files Browse the repository at this point in the history
goroutine pool was introduced to handle stack copy cost, Go1.11 has 
many optimizations for stack copy, after upgrading to Go1.1, goroutine
pool is not necessary any more.
  • Loading branch information
tiancaiamao authored Aug 31, 2018
1 parent 8d1acc2 commit 2b776ac
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 324 deletions.
9 changes: 1 addition & 8 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand All @@ -35,10 +34,6 @@ var (
_ SelectResult = (*streamResult)(nil)
)

var (
selectResultGP = gp.New(time.Minute * 2)
)

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Fetch fetches partial results from client.
Expand Down Expand Up @@ -75,9 +70,7 @@ type selectResult struct {
}

func (r *selectResult) Fetch(ctx context.Context) {
selectResultGP.Go(func() {
r.fetch(ctx)
})
go r.fetch(ctx)
}

func (r *selectResult) fetch(ctx context.Context) {
Expand Down
15 changes: 6 additions & 9 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand All @@ -42,8 +41,6 @@ const (
actionCleanup twoPhaseCommitAction = 3
)

var twoPhaseCommitGP = gp.New(3 * time.Minute)

func (ca twoPhaseCommitAction) String() string {
switch ca {
case actionPrewrite:
Expand Down Expand Up @@ -225,13 +222,13 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
twoPhaseCommitGP.Go(func() {
go func() {
e := c.doActionOnBatches(bo, action, batches)
if e != nil {
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
}
})
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
Expand Down Expand Up @@ -272,7 +269,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
for _, batch1 := range batches {

batch := batch1
twoPhaseCommitGP.Go(func() {
go func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
Expand All @@ -288,7 +285,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
})
}()
}
var err error
for i := 0; i < len(batches); i++ {
Expand Down Expand Up @@ -571,7 +568,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.mu.RUnlock()
if !committed && !undetermined {
c.cleanWg.Add(1)
twoPhaseCommitGP.Go(func() {
go func() {
err := c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
if err != nil {
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
Expand All @@ -580,7 +577,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
}
c.cleanWg.Done()
})
}()
}
}()

Expand Down
9 changes: 2 additions & 7 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var copIteratorGP = gp.New(time.Minute)

// CopClient is coprocessor client.
type CopClient struct {
store *tikvStore
Expand Down Expand Up @@ -452,9 +449,7 @@ func (it *copIterator) open(ctx context.Context) {
finishCh: it.finishCh,
vars: it.vars,
}
copIteratorGP.Go(func() {
worker.run(ctx)
})
go worker.run(ctx)
}
taskSender := &copIteratorTaskSender{
taskCh: taskCh,
Expand All @@ -463,7 +458,7 @@ func (it *copIterator) open(ctx context.Context) {
finishCh: it.finishCh,
}
taskSender.respChan = it.respChan
copIteratorGP.Go(taskSender.run)
go taskSender.run()
}

func (sender *copIteratorTaskSender) run() {
Expand Down
10 changes: 4 additions & 6 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
"golang.org/x/net/context"
)

var (
rawKVClientGP = gp.New(3 * time.Minute)
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
Expand Down Expand Up @@ -349,11 +347,11 @@ func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc
ches := make(chan singleBatchResp, len(batches))
for _, batch := range batches {
batch1 := batch
rawKVClientGP.Go(func() {
go func() {
singleBatchBackoffer, singleBatchCancel := bo.Fork()
defer singleBatchCancel()
ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType)
})
}()
}

var firstError error
Expand Down Expand Up @@ -507,11 +505,11 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
ch := make(chan error, len(batches))
for _, batch := range batches {
batch1 := batch
rawKVClientGP.Go(func() {
go func() {
singleBatchBackoffer, singleBatchCancel := bo.Fork()
defer singleBatchCancel()
ch <- c.doBatchPut(singleBatchBackoffer, batch1)
})
}()
}

for i := 0; i < len(batches); i++ {
Expand Down
7 changes: 2 additions & 5 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/goroutine_pool"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand All @@ -51,8 +50,6 @@ type tikvSnapshot struct {
vars *kv.Variables
}

var snapshotGP = gp.New(time.Minute)

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
return &tikvSnapshot{
Expand Down Expand Up @@ -123,11 +120,11 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
ch := make(chan error)
for _, batch1 := range batches {
batch := batch1
snapshotGP.Go(func() {
go func() {
backoffer, cancel := bo.Fork()
defer cancel()
ch <- s.batchGetSingleRegion(backoffer, batch, collectF)
})
}()
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
Expand Down
30 changes: 0 additions & 30 deletions util/goroutine_pool/fake.go

This file was deleted.

124 changes: 0 additions & 124 deletions util/goroutine_pool/gp.go

This file was deleted.

Loading

0 comments on commit 2b776ac

Please sign in to comment.