diff --git a/lightning/config/config.go b/lightning/config/config.go index 50a50519b..40cd0286d 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -72,6 +72,7 @@ type Lightning struct { common.LogConfig TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"` RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"` + IOConcurrency int `toml:"io-concurrency" json:"region-concurrency"` ProfilePort int `toml:"pprof-port" json:"pprof-port"` CheckRequirements bool `toml:"check-requirements" json:"check-requirements"` } @@ -130,6 +131,7 @@ func NewConfig() *Config { App: Lightning{ RegionConcurrency: runtime.NumCPU(), TableConcurrency: 8, + IOConcurrency: 5, CheckRequirements: true, }, TiDB: DBStore{ diff --git a/lightning/mydump/parser.go b/lightning/mydump/parser.go index 022e96db2..71d322d9e 100644 --- a/lightning/mydump/parser.go +++ b/lightning/mydump/parser.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/pingcap/tidb-lightning/lightning/metric" + "github.com/pingcap/tidb-lightning/lightning/worker" ) // ChunkParser is a parser of the data files (the file containing only INSERT @@ -32,6 +33,7 @@ type ChunkParser struct { // cache remainBuf *bytes.Buffer appendBuf *bytes.Buffer + ioWorkers *worker.RestoreWorkerPool } // Chunk represents a portion of the data file. @@ -49,12 +51,13 @@ type Row struct { } // NewChunkParser creates a new parser which can read chunks out of a file. -func NewChunkParser(reader io.Reader, blockBufSize int64) *ChunkParser { +func NewChunkParser(reader io.Reader, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) *ChunkParser { return &ChunkParser{ reader: reader, blockBuf: make([]byte, blockBufSize), remainBuf: &bytes.Buffer{}, appendBuf: &bytes.Buffer{}, + ioWorkers: ioWorkers, } } diff --git a/lightning/mydump/parser_test.go b/lightning/mydump/parser_test.go index fe375a4d8..68e1a8781 100644 --- a/lightning/mydump/parser_test.go +++ b/lightning/mydump/parser_test.go @@ -1,12 +1,15 @@ package mydump_test import ( + "context" "io" "strings" . "github.com/pingcap/check" "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/mydump" + "github.com/pingcap/tidb-lightning/lightning/worker" + "github.com/pkg/errors" ) @@ -25,7 +28,8 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { "insert another_table values (10, 11, 12, '(13)', '(', 14, ')');", ) - parser := mydump.NewChunkParser(reader, config.ReadBlockSize) + ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test") + parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -73,7 +77,8 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) { INSERT foo VALUES (29,30,31,32),(33,34,35,36); `) - parser := mydump.NewChunkParser(reader, config.ReadBlockSize) + ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test") + parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers) chunks, err := parser.ReadChunks(32) c.Assert(err, IsNil) @@ -119,7 +124,8 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) { ("789",CONVERT("[]" USING UTF8MB4)); `) - parser := mydump.NewChunkParser(reader, config.ReadBlockSize) + ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test") + parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers) chunks, err := parser.ReadChunks(96) c.Assert(err, IsNil) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 1bbe2af3a..782efd1fc 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/tidb-lightning/lightning/metric" "github.com/pingcap/tidb-lightning/lightning/mydump" verify "github.com/pingcap/tidb-lightning/lightning/verification" + "github.com/pingcap/tidb-lightning/lightning/worker" + tidbcfg "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/util/hack" @@ -95,8 +97,9 @@ type RestoreController struct { cfg *config.Config dbMetas []*mydump.MDDatabaseMeta dbInfos map[string]*TidbDBInfo - tableWorkers *RestoreWorkerPool - regionWorkers *RestoreWorkerPool + tableWorkers *worker.RestoreWorkerPool + regionWorkers *worker.RestoreWorkerPool + ioWorkers *worker.RestoreWorkerPool importer *kv.Importer tidbMgr *TiDBManager postProcessLock sync.Mutex // a simple way to ensure post-processing is not concurrent without using complicated goroutines @@ -129,8 +132,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, rc := &RestoreController{ cfg: cfg, dbMetas: dbMetas, - tableWorkers: NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"), - regionWorkers: NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"), + tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"), + regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"), + ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"), importer: importer, tidbMgr: tidbMgr, @@ -439,9 +443,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { // Note: We still need tableWorkers to control the concurrency of tables. In the future, we will investigate more about // the difference between restoring tables concurrently and restoring tables one by one. - worker := rc.tableWorkers.Apply() + restoreWorker := rc.tableWorkers.Apply() wg.Add(1) - go func(w *RestoreWorker, t *TableRestore, cp *TableCheckpoint) { + go func(w *worker.RestoreWorker, t *TableRestore, cp *TableCheckpoint) { defer wg.Done() closedEngine, err := t.restore(ctx, rc, cp) @@ -465,7 +469,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { } err = t.postProcess(ctx, closedEngine, rc, cp) - }(worker, tr, cp) + }(restoreWorker, tr, cp) } } @@ -548,15 +552,15 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T // 3. load kvs data (into kv deliver server) // 4. flush kvs data (into tikv node) - cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize) + cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers) if err != nil { return nil, errors.Trace(err) } metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc() - worker := rc.regionWorkers.Apply() + restoreWorker := rc.regionWorkers.Apply() wg.Add(1) - go func(w *RestoreWorker, cr *chunkRestore) { + go func(w *worker.RestoreWorker, cr *chunkRestore) { // Restore a chunk. defer func() { cr.close() @@ -581,7 +585,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T handled := int(atomic.AddInt32(handledChunksCount, 1)) common.AppLogger.Infof("[%s] handled region count = %d (%s)", t.tableName, handled, common.Percent(handled, len(cp.Chunks))) - }(worker, cr) + }(restoreWorker, cr) } wg.Wait() @@ -860,56 +864,18 @@ func (rc *RestoreController) getTables() []string { return tables } -//////////////////////////////////////////////////////////////// - -type RestoreWorkerPool struct { - limit int - workers chan *RestoreWorker - name string -} - -type RestoreWorker struct { - ID int64 -} - -func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool { - workers := make(chan *RestoreWorker, limit) - for i := 0; i < limit; i++ { - workers <- &RestoreWorker{ID: int64(i + 1)} - } - - metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit)) - return &RestoreWorkerPool{ - limit: limit, - workers: workers, - name: name, - } -} - -func (pool *RestoreWorkerPool) Apply() *RestoreWorker { - worker := <-pool.workers - metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) - return worker -} -func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) { - pool.workers <- worker - metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) -} - -//////////////////////////////////////////////////////////////// - type chunkRestore struct { parser *mydump.ChunkParser index int chunk *ChunkCheckpoint } -func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64) (*chunkRestore, error) { +func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) (*chunkRestore, error) { reader, err := os.Open(chunk.Key.Path) if err != nil { return nil, errors.Trace(err) } - parser := mydump.NewChunkParser(reader, blockBufSize) + parser := mydump.NewChunkParser(reader, blockBufSize, ioWorkers) reader.Seek(chunk.Chunk.Offset, io.SeekStart) parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax) diff --git a/lightning/worker/worker.go b/lightning/worker/worker.go new file mode 100644 index 000000000..e5c19bced --- /dev/null +++ b/lightning/worker/worker.go @@ -0,0 +1,45 @@ +package worker + +import ( + "context" + + "github.com/pingcap/tidb-lightning/lightning/metric" +) + +//////////////////////////////////////////////////////////////// + +type RestoreWorkerPool struct { + limit int + workers chan *RestoreWorker + name string +} + +type RestoreWorker struct { + ID int64 +} + +func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool { + workers := make(chan *RestoreWorker, limit) + for i := 0; i < limit; i++ { + workers <- &RestoreWorker{ID: int64(i + 1)} + } + + metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit)) + return &RestoreWorkerPool{ + limit: limit, + workers: workers, + name: name, + } +} + +func (pool *RestoreWorkerPool) Apply() *RestoreWorker { + worker := <-pool.workers + metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) + return worker +} +func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) { + pool.workers <- worker + metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) +} + +////////////////////////////////////////////////////////////////