Skip to content

Commit

Permalink
Merge pull request pingcap#12 from lichunzhu/switchImportMode
Browse files Browse the repository at this point in the history
adjust lightning arguments to increase speed
  • Loading branch information
lichunzhu authored Jan 9, 2022
2 parents e950ee2 + eca2214 commit cc662a4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB

propRangeIndex = "tikv.range_index"
Expand Down
7 changes: 4 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package ddl
import (
"context"
"fmt"
"github.com/pingcap/tidb/ddl/sst"
"github.com/pingcap/tidb/util/sqlexec"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb/ddl/sst"
"github.com/pingcap/tidb/util/sqlexec"

tableutil "github.com/pingcap/tidb/table/tables/util"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -555,7 +556,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateWriteReorganization:
// TODO: optimize index ddl.
if *sst.IndexDDLLightning {
sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS})
err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS})
if err != nil {
return ver, errors.Trace(fmt.Errorf("PrepareIndexOp err:%w", err))
}
Expand Down
36 changes: 33 additions & 3 deletions ddl/sst/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import (
"sync/atomic"
"time"

"github.com/docker/go-units"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -116,7 +121,7 @@ func (_ glue_) Record(string, uint64) {
}

func generateLightningConfig(info ClusterInfo) *config.Config {
cfg := config.Config{}
cfg := config.NewConfig()
cfg.DefaultVarsForImporterAndLocalBackend()
name, err := ioutil.TempDir(*sortkv, "lightning")
if err != nil {
Expand All @@ -125,14 +130,16 @@ func generateLightningConfig(info ClusterInfo) *config.Config {
}
os.Remove(name)
LogDebug("./ %s.", name)
// cfg.TikvImporter.RangeConcurrency = 32
cfg.Checkpoint.Enable = false
cfg.TikvImporter.SortedKVDir = name
cfg.TikvImporter.RangeConcurrency = 32
cfg.TikvImporter.EngineMemCacheSize = 512 * units.MiB
cfg.TikvImporter.LocalWriterMemCacheSize = 128 * units.MiB
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
cfg.TiDB.PdAddr = info.PdAddr
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(info.Status)
return &cfg
return cfg
}

func createLocalBackend(ctx context.Context, info ClusterInfo) (backend.Backend, error) {
Expand All @@ -144,3 +151,26 @@ func createLocalBackend(ctx context.Context, info ClusterInfo) (backend.Backend,
var g glue_
return local.NewLocalBackend(ctx, tls, cfg, &g, int(limit), nil)
}

func switchTiKVMode(tls *common.TLS, ctx context.Context, mode sstpb.SwitchMode) {
// It is fine if we miss some stores which did not switch to Import mode,
// since we're running it periodically, so we exclude disconnected stores.
// But it is essential all stores be switched back to Normal mode to allow
// normal operation.
var minState tikv.StoreState
if mode == sstpb.SwitchMode_Import {
minState = tikv.StoreStateOffline
} else {
minState = tikv.StoreStateDisconnected
}
// we ignore switch mode failure since it is not fatal.
// no need log the error, it is done in kv.SwitchMode already.
_ = tikv.ForAllStores(
ctx,
tls,
minState,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, mode)
},
)
}
12 changes: 10 additions & 2 deletions ddl/sst/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"fmt"

"github.com/pingcap/errors"

"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/util/sqlexec"

Expand Down Expand Up @@ -192,8 +194,14 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS
//
closeEngine, err1 := indexEngine.Close(ctx, cfg)
if err1 != nil {
return fmt.Errorf("engine.Close err:%w", err1)
}
return errors.Annotate(err1, "engine.Close err")
}
//tls, err := common.NewTLS("", "", "", cluster.PdAddr)
//if err != nil {
// return errors.Annotate(err, "fail to create tls")
//}
//switchTiKVMode(tls, ctx, sstpb.SwitchMode_Import)
//defer switchTiKVMode(tls, ctx, sstpb.SwitchMode_Normal)
// use default value first;
err = closeEngine.Import(ctx, int64(config.SplitRegionSize))
if err != nil {
Expand Down

0 comments on commit cc662a4

Please sign in to comment.