Skip to content

Commit

Permalink
[lightning] add minRegionNum parameter (pingcap#44)
Browse files Browse the repository at this point in the history
* add min reigon number

* sync with upstream
  • Loading branch information
Shouyan guo authored and GitHub Enterprise committed Mar 5, 2024
1 parent 4d8dfde commit ac67ddf
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 15 deletions.
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type AbstractBackend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys, minRegionNum int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -327,7 +327,7 @@ func (be Backend) CheckDiskQuota(quota int64) (
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys, minRegionNum int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
Expand All @@ -337,7 +337,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys, minRegionNum); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
Expand Down Expand Up @@ -469,12 +469,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys, minRegionNum int64) error {
var err error

for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys, minRegionNum)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
Expand Down
20 changes: 16 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit
return ranges
}

func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, regionSplitSize int64, regionSplitKeys int64) ([]Range, error) {
func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, regionSplitSize int64, regionSplitKeys int64, minRegionNum int64) ([]Range, error) {
iter := engine.newKVIter(ctx, &pebble.IterOptions{})
//nolint: errcheck
defer iter.Close()
Expand Down Expand Up @@ -1280,13 +1280,25 @@ func (local *local) readAndSplitIntoRange(ctx context.Context, engine *Engine, r
engineFileTotalSize := engine.TotalSize.Load()
engineFileLength := engine.Length.Load()

logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID))

if minRegionNum > 0 && engineFileTotalSize/regionSplitSize < minRegionNum {
regionSplitSize = engineFileTotalSize / minRegionNum
logger.Info("enforce minRegionNum",
zap.Int64("totalSize", engineFileTotalSize), zap.Int64("minRegionNum", minRegionNum), zap.Int64("regionSplitSize", regionSplitSize))
}
if minRegionNum > 0 && engineFileLength/regionSplitKeys < minRegionNum {
regionSplitKeys = engineFileLength / minRegionNum
logger.Info("enforce minRegionNum",
zap.Int64("totalCount", engineFileLength), zap.Int64("minRegionNum", minRegionNum), zap.Int64("regionSplitKeys", regionSplitKeys))
}

// <= 96MB no need to split into range
if engineFileTotalSize <= regionSplitSize && engineFileLength <= regionSplitKeys {
ranges := []Range{{start: firstKey, end: endKey}}
return ranges, nil
}

logger := log.FromContext(ctx).With(zap.Stringer("engine", engine.UUID))
sizeProps, err := getSizeProperties(logger, engine.getDB(), local.keyAdapter)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1607,7 +1619,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
return allErr
}

func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys, minRegionNum int64) error {
lf := local.lockEngine(engineUUID, importMutexStateImport)
if lf == nil {
// skip if engine not exist. See the comment of `CloseEngine` for more detail.
Expand All @@ -1634,7 +1646,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
}

// split sorted file into range by 96MB size per file
ranges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys)
ranges, err := local.readAndSplitIntoRange(ctx, lf, regionSplitSize, regionSplitKeys, minRegionNum)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
return nil
}

func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys, minRegionNum int64) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table
return nil
}

func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error {
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64, int64) error {
return nil
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ type TikvImporter struct {
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
MinRegionNum int `toml:"min-region-num" json:"min-region-num"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
// lower the max-key-count to avoid tikv trigger region auto split
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MinRegionNum int = 0 // disable minRegionNum by default
MaxSplitRegionSizeRatio int = 10

BufferSizeScale = 5
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,7 +2010,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
var importErr error
for _, engine := range largeEngines {
// Use a larger split region size to avoid split the same region by many times.
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio), int64(config.MinRegionNum)); err != nil {
importErr = multierr.Append(importErr, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ func (tr *TableRestore) importKV(
regionSplitKeys = int64(config.SplitRegionKeys)
}
}
err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys)
err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys, int64(rc.cfg.TikvImporter.MinRegionNum))
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported)
// Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart.
if err == nil && saveCpErr == nil {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (bc *BackendContext) Flush(indexID int64) error {
logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()),
zap.Uint64("max disk quota", bc.diskRoot.MaxQuota()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys), int64(config.MinRegionNum))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
zap.Error(err), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()),
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (ei *engineInfo) ImportAndClean() error {
logutil.BgLogger().Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID),
zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10)))
err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys), int64(config.MinRegionNum))
if err != nil {
logutil.BgLogger().Error(LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
Expand Down

0 comments on commit ac67ddf

Please sign in to comment.