diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index ff7d9aa1810c6..bb7f5892b4c57 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -93,7 +93,7 @@ const ( gRPCBackOffMaxDelay = 10 * time.Minute // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 - regionMaxKeyCount = 1_440_000 + regionMaxKeyCount = 1_280_000 defaultRegionSplitSize = 96 * units.MiB propRangeIndex = "tikv.range_index" diff --git a/ddl/index.go b/ddl/index.go index 3e802975cd746..cbfaabf25c25f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -17,12 +17,13 @@ package ddl import ( "context" "fmt" - "github.com/pingcap/tidb/ddl/sst" - "github.com/pingcap/tidb/util/sqlexec" "strings" "sync/atomic" "time" + "github.com/pingcap/tidb/ddl/sst" + "github.com/pingcap/tidb/util/sqlexec" + tableutil "github.com/pingcap/tidb/table/tables/util" "github.com/pingcap/errors" @@ -555,7 +556,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo case model.StateWriteReorganization: // TODO: optimize index ddl. if *sst.IndexDDLLightning { - sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS}) + err = sst.PrepareIndexOp(w.ctx, sst.DDLInfo{job.SchemaName, tblInfo, job.StartTS}) if err != nil { return ver, errors.Trace(fmt.Errorf("PrepareIndexOp err:%w", err)) } diff --git a/ddl/sst/common.go b/ddl/sst/common.go index 918c383d46200..f07b0fec297e1 100644 --- a/ddl/sst/common.go +++ b/ddl/sst/common.go @@ -10,11 +10,16 @@ import ( "sync/atomic" "time" + "github.com/docker/go-units" + sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/glue" + "github.com/pingcap/tidb/br/pkg/lightning/tikv" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/logutil" @@ -116,7 +121,7 @@ func (_ glue_) Record(string, uint64) { } func generateLightningConfig(info ClusterInfo) *config.Config { - cfg := config.Config{} + cfg := config.NewConfig() cfg.DefaultVarsForImporterAndLocalBackend() name, err := ioutil.TempDir(*sortkv, "lightning") if err != nil { @@ -125,14 +130,16 @@ func generateLightningConfig(info ClusterInfo) *config.Config { } os.Remove(name) LogDebug("./ %s.", name) - // cfg.TikvImporter.RangeConcurrency = 32 cfg.Checkpoint.Enable = false cfg.TikvImporter.SortedKVDir = name + cfg.TikvImporter.RangeConcurrency = 32 + cfg.TikvImporter.EngineMemCacheSize = 512 * units.MiB + cfg.TikvImporter.LocalWriterMemCacheSize = 128 * units.MiB cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone cfg.TiDB.PdAddr = info.PdAddr cfg.TiDB.Host = "127.0.0.1" cfg.TiDB.StatusPort = int(info.Status) - return &cfg + return cfg } func createLocalBackend(ctx context.Context, info ClusterInfo) (backend.Backend, error) { @@ -144,3 +151,26 @@ func createLocalBackend(ctx context.Context, info ClusterInfo) (backend.Backend, var g glue_ return local.NewLocalBackend(ctx, tls, cfg, &g, int(limit), nil) } + +func switchTiKVMode(tls *common.TLS, ctx context.Context, mode sstpb.SwitchMode) { + // It is fine if we miss some stores which did not switch to Import mode, + // since we're running it periodically, so we exclude disconnected stores. + // But it is essential all stores be switched back to Normal mode to allow + // normal operation. + var minState tikv.StoreState + if mode == sstpb.SwitchMode_Import { + minState = tikv.StoreStateOffline + } else { + minState = tikv.StoreStateDisconnected + } + // we ignore switch mode failure since it is not fatal. + // no need log the error, it is done in kv.SwitchMode already. + _ = tikv.ForAllStores( + ctx, + tls, + minState, + func(c context.Context, store *tikv.Store) error { + return tikv.SwitchMode(c, tls, store.Address, mode) + }, + ) +} diff --git a/ddl/sst/index.go b/ddl/sst/index.go index 91367df6899f6..d3f6724764250 100644 --- a/ddl/sst/index.go +++ b/ddl/sst/index.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/util/sqlexec" @@ -192,8 +194,14 @@ func FinishIndexOp(ctx context.Context, startTs uint64, exec sqlexec.RestrictedS // closeEngine, err1 := indexEngine.Close(ctx, cfg) if err1 != nil { - return fmt.Errorf("engine.Close err:%w", err1) - } + return errors.Annotate(err1, "engine.Close err") + } + //tls, err := common.NewTLS("", "", "", cluster.PdAddr) + //if err != nil { + // return errors.Annotate(err, "fail to create tls") + //} + //switchTiKVMode(tls, ctx, sstpb.SwitchMode_Import) + //defer switchTiKVMode(tls, ctx, sstpb.SwitchMode_Normal) // use default value first; err = closeEngine.Import(ctx, int64(config.SplitRegionSize)) if err != nil {