Skip to content

Commit

Permalink
lightning: refactor to better reuse in load data (#42038)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
D3Hunter authored Mar 9, 2023
1 parent b182db2 commit e059bbd
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 153 deletions.
22 changes: 17 additions & 5 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type EngineConfig struct {
// TableInfo is the corresponding tidb table info
TableInfo *checkpoints.TidbTableInfo
// local backend specified configuration
Local *LocalEngineConfig
Local LocalEngineConfig
}

// LocalEngineConfig is the configuration used for local backend in OpenEngine.
Expand Down Expand Up @@ -229,6 +229,9 @@ type engine struct {
backend AbstractBackend
logger log.Logger
uuid uuid.UUID
// id of the engine, used to generate uuid and stored in checkpoint
// for index engine it's -1
id int32
}

// OpenedEngine is an opened engine, allowing data to be written via WriteRows.
Expand All @@ -237,6 +240,7 @@ type engine struct {
type OpenedEngine struct {
engine
tableName string
config *EngineConfig
}

// // import_ the data written to the engine into the target.
Expand Down Expand Up @@ -377,8 +381,10 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
backend: be.abstract,
logger: logger,
uuid: engineUUID,
id: engineID,
},
tableName: tableName,
config: config,
}, nil
}

Expand All @@ -395,8 +401,8 @@ func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tab
}

// Close the opened engine to prepare it for importing.
func (engine *OpenedEngine) Close(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, cfg)
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, engine.config)
if err == nil {
if m, ok := metric.FromContext(ctx); ok {
m.ImporterEngineCounter.WithLabelValues("closed").Inc()
Expand Down Expand Up @@ -442,19 +448,20 @@ func (w *LocalEngineWriter) IsSynced() bool {
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID)
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}

// UnsafeCloseEngineWithUUID closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID) (*ClosedEngine, error) {
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
return engine{
backend: be.abstract,
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
uuid: engineUUID,
id: id,
}.unsafeClose(ctx, cfg)
}

Expand All @@ -468,6 +475,11 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
return &ClosedEngine{engine: en}, nil
}

// GetID get engine id.
func (en engine) GetID() int32 {
return en.id
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
OpenEngine(ctx, &backend.EngineConfig{}, engineUUID).
Return(nil)
closeCall := s.mockBackend.EXPECT().
CloseEngine(ctx, nil, engineUUID).
CloseEngine(ctx, &backend.EngineConfig{}, engineUUID).
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
Expand All @@ -64,7 +64,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {

engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`db`.`table`", 1)
require.NoError(t, err)
closedEngine, err := engine.Close(ctx, nil)
closedEngine, err := engine.Close(ctx)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestUnsafeCloseEngineWithUUID(t *testing.T) {
Return(nil).
After(closeCall)

closedEngine, err := s.backend.UnsafeCloseEngineWithUUID(ctx, nil, "some_tag", engineUUID)
closedEngine, err := s.backend.UnsafeCloseEngineWithUUID(ctx, nil, "some_tag", engineUUID, 0)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/lightning/manual",
"//br/pkg/lightning/metric",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/tikv",
"//br/pkg/logutil",
"//br/pkg/membuf",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func (e *Engine) batchIngestSSTs(metas []*sstMeta) error {
return bytes.Compare(i.minKey, j.minKey) < 0
})

// non overlapping sst is grouped, and ingested in that order
metaLevels := make([][]*sstMeta, 0)
for _, meta := range metas {
inserted := false
Expand Down
6 changes: 1 addition & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,10 +827,6 @@ func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.D

// OpenEngine must be called with holding mutex of Engine.
func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error {
engineCfg := backend.LocalEngineConfig{}
if cfg.Local != nil {
engineCfg = *cfg.Local
}
db, err := local.openEngineDB(engineUUID, false)
if err != nil {
return err
Expand All @@ -853,7 +849,7 @@ func (local *local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e
sstMetasChan: make(chan metaOrFlush, 64),
ctx: engineCtx,
cancel: cancel,
config: engineCfg,
config: cfg.Local,
tableInfo: cfg.TableInfo,
duplicateDetection: local.duplicateDetection,
dupDetectOpt: local.duplicateDetectOpt,
Expand Down
52 changes: 52 additions & 0 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/multierr"
Expand Down Expand Up @@ -676,3 +678,53 @@ func (noopStoreWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) e
func (noopStoreWriteLimiter) Limit() int {
return math.MaxInt
}

const (
CompactionLowerThreshold = 512 * units.MiB
CompactionUpperThreshold = 32 * units.GiB
)

// EstimateCompactionThreshold estimate SST files compression threshold by total row file size
// with a higher compression threshold, the compression time increases, but the iteration time decreases.
// Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min,
// we set the upper bound to 32GB to avoid too long compression time.
// factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine).
func EstimateCompactionThreshold(files []mydump.FileInfo, cp *checkpoints.TableCheckpoint, factor int64) int64 {
totalRawFileSize := int64(0)
var lastFile string
fileSizeMap := make(map[string]int64, len(files))
for _, file := range files {
fileSizeMap[file.FileMeta.Path] = file.FileMeta.RealSize
}

for _, engineCp := range cp.Engines {
for _, chunk := range engineCp.Chunks {
if chunk.FileMeta.Path == lastFile {
continue
}
size, ok := fileSizeMap[chunk.FileMeta.Path]
if !ok {
size = chunk.FileMeta.FileSize
}
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
// parquet file is compressed, thus estimates with a factor of 2
size *= 2
}
totalRawFileSize += size
lastFile = chunk.FileMeta.Path
}
}
totalRawFileSize *= factor

// try restrict the total file number within 512
threshold := totalRawFileSize / 512
threshold = utils.NextPowerOfTwo(threshold)
if threshold < CompactionLowerThreshold {
// too may small SST files will cause inaccuracy of region range estimation,
threshold = CompactionLowerThreshold
} else if threshold > CompactionUpperThreshold {
threshold = CompactionUpperThreshold
}

return threshold
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func (ccp *ChunkCheckpoint) FinishedSize() int64 {
return ccp.Chunk.RealOffset - ccp.Key.Offset
}

func (ccp *ChunkCheckpoint) GetKey() string {
return ccp.Key.String()
}

type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "common",
srcs = [
"common.go",
"conn.go",
"errors.go",
"once_error.go",
Expand All @@ -22,6 +23,8 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/utils",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
"//store/driver/error",
"//table/tables",
Expand Down
79 changes: 79 additions & 0 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
)

const (
IndexEngineID = -1
)

func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
return alloc.Alloc(ctx, uint64(n), 1, 1)
}

func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return err
}
return alloc.Rebase(ctx, newBase, false)
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoid.Allocator, error) {
if store == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
return nil, errors.New("internal error: dbID should not be 0")
}

// We don't need autoid cache here because we allocate all IDs at once.
// The argument for CustomAutoIncCacheOption is the cache step. Step 1 means no cache,
// but step 1 will enable an experimental feature, so we use step 2 here.
//
// See https://github.com/pingcap/tidb/issues/38442 for more details.
noCache := autoid.CustomAutoIncCacheOption(2)
tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version)

hasRowID := TableHasAutoRowID(tblInfo)
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
hasAutoRandID := tblInfo.ContainsAutoRandomBits()

// Current TiDB has some limitations for auto ID.
// 1. Auto increment ID and auto row ID are using the same RowID allocator. See https://github.com/pingcap/tidb/issues/982.
// 2. Auto random column must be a clustered primary key. That is to say, there is no implicit row ID for tables with auto random column.
// 3. There is at most one auto column in a table.
// Therefore, we assume there is only one auto column in a table and use RowID allocator if possible.
switch {
case hasRowID || hasAutoIncID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, noCache, tblVer), nil
case hasAutoRandID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, noCache, tblVer), nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
}
}
9 changes: 1 addition & 8 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ const (
defaultGCLifeTime = 100 * time.Hour
)

const (
indexEngineID = -1
)

const (
compactStateIdle int32 = iota
compactStateDoing
Expand Down Expand Up @@ -115,9 +111,6 @@ const (
cluster_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
PRIMARY KEY (task_id)
);`

compactionLowerThreshold = 512 * units.MiB
compactionUpperThreshold = 32 * units.GiB
)

var (
Expand Down Expand Up @@ -935,7 +928,7 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error {
if eCp.Status < checkpoints.CheckpointStatusImported {
estimatedEngineCnt++
}
if engineID == indexEngineID {
if engineID == common.IndexEngineID {
continue
}
for _, c := range eCp.Chunks {
Expand Down
Loading

0 comments on commit e059bbd

Please sign in to comment.