Skip to content

Commit

Permalink
lightning: auto adjust region-split-size (#27389)
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Aug 26, 2021
1 parent 8370cab commit 9f28c4e
Show file tree
Hide file tree
Showing 17 changed files with 81 additions and 74 deletions.
6 changes: 5 additions & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,11 @@ func importEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engi
return errors.Trace(err)
}

return errors.Trace(ce.Import(ctx))
regionSplitSize := int64(cfg.TikvImporter.RegionSplitSize)
if regionSplitSize == 0 {
regionSplitSize = int64(config.SplitRegionSize)
}
return errors.Trace(ce.Import(ctx, regionSplitSize))
}

func cleanupEngine(ctx context.Context, cfg *config.Config, tls *common.TLS, engine string) error {
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type AbstractBackend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -310,7 +310,7 @@ func (be Backend) CheckDiskQuota(quota int64) (
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID) error {
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
Expand All @@ -320,7 +320,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
Expand Down Expand Up @@ -436,12 +436,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error {
var err error

for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
Expand Down
18 changes: 9 additions & 9 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
ImportEngine(ctx, engineUUID).
ImportEngine(ctx, engineUUID, gomock.Any()).
Return(nil).
After(closeCall)
s.mockBackend.EXPECT().
Expand All @@ -70,7 +70,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
c.Assert(err, IsNil)
closedEngine, err := engine.Close(ctx, nil)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, IsNil)
err = closedEngine.Cleanup(ctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -252,12 +252,12 @@ func (s *backendSuite) TestImportFailedNoRetry(c *C) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, ErrorMatches, "fake unrecoverable import error.*")
}

Expand All @@ -269,14 +269,14 @@ func (s *backendSuite) TestImportFailedWithRetry(c *C) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(errors.New("fake recoverable import error")).
MinTimes(2)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, ErrorMatches, ".*fake recoverable import error")
}

Expand All @@ -288,16 +288,16 @@ func (s *backendSuite) TestImportFailedRecovered(c *C) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(errors.New("fake recoverable import error"))
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any()).
Return(nil)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
c.Assert(err, IsNil)
err = closedEngine.Import(ctx)
err = closedEngine.Import(ctx, 1)
c.Assert(err, IsNil)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (importer *importer) Flush(_ context.Context, _ uuid.UUID) error {
return nil
}

func (importer *importer) ImportEngine(ctx context.Context, engineUUID uuid.UUID) error {
func (importer *importer) ImportEngine(ctx context.Context, engineUUID uuid.UUID, _ int64) error {
importer.lock.Lock()
defer importer.lock.Unlock()
req := &import_kvpb.ImportEngineRequest{
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *importerSuite) TestCloseImportCleanupEngine(c *C) {

engine, err := s.engine.Close(s.ctx, nil)
c.Assert(err, IsNil)
err = engine.Import(s.ctx)
err = engine.Import(s.ctx, 1)
c.Assert(err, IsNil)
err = engine.Cleanup(s.ctx)
c.Assert(err, IsNil)
Expand Down
58 changes: 26 additions & 32 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,7 @@ type local struct {
pdAddr string
g glue.Glue

localStoreDir string
regionSplitSize int64
regionSplitKeys int64
localStoreDir string

rangeConcurrency *worker.Pool
ingestConcurrency *worker.Pool
Expand Down Expand Up @@ -939,12 +937,6 @@ func NewLocalBackend(
}
}

regionSplitSize := int64(cfg.RegionSplitSize)
regionSplitKeys := int64(regionMaxKeyCount)
if regionSplitSize > defaultRegionSplitSize {
regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount))
}

local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -953,10 +945,7 @@ func NewLocalBackend(
pdAddr: pdAddr,
g: g,

localStoreDir: localFile,
regionSplitSize: regionSplitSize,
regionSplitKeys: regionSplitKeys,

localStoreDir: localFile,
rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"),
ingestConcurrency: worker.NewPool(ctx, rangeConcurrency*2, "ingest"),
tcpConcurrency: rangeConcurrency,
Expand Down Expand Up @@ -1185,11 +1174,6 @@ func (local *local) RetryImportDelay() time.Duration {
return defaultRetryBackoffTime
}

func (local *local) MaxChunkSize() int {
// a batch size write to leveldb
return int(local.regionSplitSize)
}

func (local *local) ShouldPostProcess() bool {
return true
}
Expand Down Expand Up @@ -1365,6 +1349,8 @@ func (local *local) WriteToTiKV(
engineFile *File,
region *split.RegionInfo,
start, end []byte,
regionSplitSize int64,
regionSplitKeys int64,
) ([]*sst.SSTMeta, Range, rangeStats, error) {
for _, peer := range region.Region.GetPeers() {
var e error
Expand Down Expand Up @@ -1463,7 +1449,7 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := local.regionSplitSize * 4 / 3
regionMaxSize := regionSplitSize * 4 / 3

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down Expand Up @@ -1492,7 +1478,7 @@ func (local *local) WriteToTiKV(
bytesBuf.Reset()
firstLoop = false
}
if size >= regionMaxSize || totalCount >= local.regionSplitKeys {
if size >= regionMaxSize || totalCount >= regionSplitKeys {
break
}
}
Expand Down Expand Up @@ -1624,7 +1610,7 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit
return ranges
}

func (local *local) readAndSplitIntoRange(ctx context.Context, engineFile *File) ([]Range, error) {
func (local *local) readAndSplitIntoRange(ctx context.Context, engineFile *File, regionSplitSize int64, regionSplitKeys int64) ([]Range, error) {
iter := newKeyIter(ctx, engineFile, &pebble.IterOptions{})
defer iter.Close()

Expand Down Expand Up @@ -1653,7 +1639,7 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engineFile *File)
engineFileLength := engineFile.Length.Load()

// <= 96MB no need to split into range
if engineFileTotalSize <= local.regionSplitSize && engineFileLength <= local.regionSplitKeys {
if engineFileTotalSize <= regionSplitSize && engineFileLength <= regionSplitKeys {
ranges := []Range{{start: firstKey, end: endKey}}
return ranges, nil
}
Expand All @@ -1664,7 +1650,7 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engineFile *File)
}

ranges := splitRangeBySizeProps(Range{start: firstKey, end: endKey}, sizeProps,
local.regionSplitSize, local.regionSplitKeys)
regionSplitSize, regionSplitKeys)

log.L().Info("split engine key ranges", zap.Stringer("engine", engineFile.UUID),
zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength),
Expand All @@ -1678,6 +1664,8 @@ func (local *local) writeAndIngestByRange(
ctxt context.Context,
engineFile *File,
start, end []byte,
regionSplitSize int64,
regionSplitKeys int64,
) error {
ito := &pebble.IterOptions{
LowerBound: start,
Expand Down Expand Up @@ -1736,7 +1724,7 @@ WriteAndIngest:
zap.Binary("end", region.Region.GetEndKey()), zap.Reflect("peers", region.Region.GetPeers()))

w := local.ingestConcurrency.Apply()
err = local.writeAndIngestPairs(ctx, engineFile, region, pairStart, end)
err = local.writeAndIngestPairs(ctx, engineFile, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if common.IsContextCanceledError(err) {
Expand Down Expand Up @@ -1774,6 +1762,8 @@ func (local *local) writeAndIngestPairs(
engineFile *File,
region *split.RegionInfo,
start, end []byte,
regionSplitSize int64,
regionSplitKeys int64,
) error {
var err error

Expand All @@ -1782,7 +1772,7 @@ loopWrite:
var metas []*sst.SSTMeta
var finishedRange Range
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end)
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engineFile, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if common.IsContextCanceledError(err) {
return err
Expand Down Expand Up @@ -1889,7 +1879,7 @@ loopWrite:
return errors.Trace(err)
}

func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File, ranges []Range) error {
func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File, ranges []Range, regionSplitSize int64, regionSplitKeys int64) error {
if engineFile.Length.Load() == 0 {
// engine is empty, this is likes because it's a index engine but the table contains no index
log.L().Info("engine contains no data", zap.Stringer("uuid", engineFile.UUID))
Expand Down Expand Up @@ -1921,7 +1911,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File
// max retry backoff time: 2+4+8+16=30s
backOffTime := time.Second
for i := 0; i < maxRetryTimes; i++ {
err = local.writeAndIngestByRange(ctx, engineFile, startKey, endKey)
err = local.writeAndIngestByRange(ctx, engineFile, startKey, endKey, regionSplitSize, regionSplitKeys)
if err == nil || common.IsContextCanceledError(err) {
return
}
Expand Down Expand Up @@ -1967,7 +1957,7 @@ func (r *syncedRanges) reset() {
r.Unlock()
}

func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) error {
func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
lf := local.lockEngine(engineUUID, importMutexStateImport)
if lf == nil {
// skip if engine not exist. See the comment of `CloseEngine` for more detail.
Expand All @@ -1981,9 +1971,13 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
regionSplitKeys := int64(regionMaxKeyCount)
if regionSplitSize > defaultRegionSplitSize {
regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount))
}

// split sorted file into range by 96MB size per file
ranges, err := local.readAndSplitIntoRange(ctx, lf)
ranges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys)
if err != nil {
return err
}
Expand All @@ -1999,10 +1993,10 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro

// if all the kv can fit in one region, skip split regions. TiDB will split one region for
// the table when table is created.
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > local.regionSplitSize || lfLength > local.regionSplitKeys
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit)
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand All @@ -2016,7 +2010,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
}

// start to write to kv and ingest
err = local.writeAndIngestByRanges(ctx, lf, unfinishedRanges)
err = local.writeAndIngestByRanges(ctx, lf, unfinishedRanges, regionSplitSize, regionSplitKeys)
if err != nil {
log.L().Error("write and ingest engine failed", log.ShortError(err))
return err
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (local *local) SplitAndScatterRegionByRanges(
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
) error {
if len(ranges) == 0 {
return nil
Expand Down Expand Up @@ -270,7 +271,7 @@ func (local *local) SplitAndScatterRegionByRanges(
if !ok {
log.L().Warn("region stats not found", zap.Uint64("region", regionID))
}
if len(keys) == 1 && regionSize < local.regionSplitSize {
if len(keys) == 1 && regionSize < regionSplitSize {
skippedKeys++
}
select {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (s *localSuite) doTestBatchSplitRegionByRanges(ctx context.Context, c *C, h
start = end
}

err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true)
err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
if len(errPat) == 0 {
c.Assert(err, IsNil)
} else {
Expand Down Expand Up @@ -643,7 +643,7 @@ func (s *localSuite) doTestBatchSplitByRangesWithClusteredIndex(c *C, hook clien
start = e
}

err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true)
err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
c.Assert(err, IsNil)

startKey := codec.EncodeBytes([]byte{}, rangeKeys[0])
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
return nil
}

func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID) error {
func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (be *tidbBackend) CollectRemoteDuplicateRows(ctx context.Context, tbl table
panic("Unsupported Operation")
}

func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID) error {
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64) error {
return nil
}

Expand Down
Loading

0 comments on commit 9f28c4e

Please sign in to comment.