diff --git a/br/pkg/config/kv.go b/br/pkg/config/kv.go new file mode 100644 index 0000000000000..e211887099854 --- /dev/null +++ b/br/pkg/config/kv.go @@ -0,0 +1,74 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. +package config + +import ( + "encoding/json" + + "github.com/docker/go-units" +) + +type ConfigTerm[T uint | uint64] struct { + Value T + Modified bool +} + +type KVConfig struct { + ImportGoroutines ConfigTerm[uint] + MergeRegionSize ConfigTerm[uint64] + MergeRegionKeyCount ConfigTerm[uint64] +} + +func ParseImportThreadsFromConfig(resp []byte) (uint, error) { + type importer struct { + Threads uint `json:"num-threads"` + } + + type config struct { + Import importer `json:"import"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, e + } + + return c.Import.Threads, nil +} + +func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) { + type coprocessor struct { + RegionSplitSize string `json:"region-split-size"` + RegionSplitKeys uint64 `json:"region-split-keys"` + } + + type config struct { + Cop coprocessor `json:"coprocessor"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, 0, e + } + rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) + if e != nil { + return 0, 0, e + } + urs := uint64(rs) + return urs, c.Cop.RegionSplitKeys, nil +} + +func ParseLogBackupEnableFromConfig(resp []byte) (bool, error) { + type logbackup struct { + Enable bool `json:"enable"` + } + + type config struct { + LogBackup logbackup `json:"log-backup"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return false, e + } + return c.LogBackup.Enable, nil +} diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index f6065e6757506..b027c162b5493 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -44,6 +44,10 @@ go_test( ], embed = [":conn"], flaky = True, +<<<<<<< HEAD +======= + shard_count = 8, +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) deps = [ "//br/pkg/conn/util", "//br/pkg/pdutil", diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 38461c4dd5652..4540f8244683a 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -327,6 +327,25 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli return regionSplitSize, regionSplitKeys } +// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. +func (mgr *Mgr) IsLogBackupEnabled(ctx context.Context, client *http.Client) (bool, error) { + logbackupEnable := true + err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + enable, err := kvconfig.ParseLogBackupEnableFromConfig(respBytes) + if err != nil { + log.Warn("Failed to parse log-backup enable from config", logutil.ShortError(err)) + return err + } + logbackupEnable = logbackupEnable && enable + return nil + }) + return logbackupEnable, errors.Trace(err) +} + // GetConfigFromTiKV get configs from all alive tikv stores. func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error { allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index d9544cb32eafc..531f8cbb75244 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -436,6 +436,169 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { } } +func TestIsLogBackupEnabled(t *testing.T) { + cases := []struct { + stores []*metapb.Store + content []string + enable bool + err bool + }{ + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + }, + content: []string{""}, + enable: true, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "", + // Assuming the TiKV has failed due to some reason. + "", + }, + enable: false, + err: true, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", + }, + enable: true, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", + }, + enable: false, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", + }, + enable: false, + err: false, + }, + } + + pctx := context.Background() + for _, ca := range cases { + ctx, cancel := context.WithCancel(pctx) + pdCli := utils.FakePDClient{Stores: ca.stores} + require.Equal(t, len(ca.content), len(ca.stores)) + count := 0 + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/config": + if len(ca.content[count]) == 0 { + cancel() + } + _, _ = fmt.Fprint(w, ca.content[count]) + default: + http.NotFoundHandler().ServeHTTP(w, r) + } + count++ + })) + + for _, s := range ca.stores { + s.Address = mockServer.URL + s.StatusAddress = mockServer.URL + } + + httpCli := mockServer.Client() + mgr := &Mgr{PdController: &pdutil.PdController{}} + mgr.PdController.SetPDClient(pdCli) + enable, err := mgr.IsLogBackupEnabled(ctx, httpCli) + if ca.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, ca.enable, enable) + } + mockServer.Close() + } +} + func TestHandleTiKVAddress(t *testing.T) { cases := []struct { store *metapb.Store diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index d5aeb74b52cf0..65f00e9cec096 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -342,6 +342,15 @@ func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrin } } +func (ctl *StatusController) Close() error { + if ctl.meta != nil { + if err := ctl.meta.Close(); err != nil { + return errors.Trace(err) + } + } + return nil +} + // fillTask queries and fills the extra information for a raw task. func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatus, error) { var err error diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index e78647328380d..9abf37d4b8977 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -296,7 +296,7 @@ type streamMgr struct { func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) { mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, true, conn.StreamVersionChecker) + cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { return nil, errors.Trace(err) } @@ -372,6 +372,24 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error { return nil } +<<<<<<< HEAD +======= +// checkImportTaskRunning checks whether there is any import task running. +func (s *streamMgr) checkImportTaskRunning(ctx context.Context, etcdCLI *clientv3.Client) error { + list, err := utils.GetImportTasksFrom(ctx, etcdCLI) + if err != nil { + return errors.Trace(err) + } + if !list.Empty() { + return errors.Errorf("There are some lightning/restore tasks running: %s"+ + "please stop or wait finishing at first. "+ + "If the lightning/restore task is forced to terminate by system, "+ + "please wait for ttl to decrease to 0.", list.MessageToUser()) + } + return nil +} + +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) // setGCSafePoint sets the server safe point to PD. func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePoint) error { err := utils.CheckGCSafePoint(ctx, s.mgr.GetPDClient(), sp.BackupTS) @@ -389,7 +407,7 @@ func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePo return nil } -func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, error) { +func (s *streamMgr) buildObserveRanges() ([]kv.KeyRange, error) { dRanges, err := stream.BuildObserveDataRanges( s.mgr.GetStorage(), s.cfg.FilterStr, @@ -409,13 +427,13 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro return rs, nil } -func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { +func (s *streamMgr) backupFullSchemas(ctx context.Context) error { clusterVersion, err := s.mgr.GetClusterVersion(ctx) if err != nil { return errors.Trace(err) } - metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil) + metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, true, metautil.MetaFile, nil) metaWriter.Update(func(m *backuppb.BackupMeta) { // save log startTS to backupmeta file m.StartVersion = s.cfg.StartTS @@ -441,6 +459,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { return nil } +<<<<<<< HEAD func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { se, err := g.CreateSession(s.mgr.GetStorage()) if err != nil { @@ -448,6 +467,10 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { } execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) supportStream, err := utils.IsLogBackupEnabled(execCtx) +======= +func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error { + supportStream, err := s.mgr.IsLogBackupEnabled(ctx, s.httpCli) +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) if err != nil { return errors.Trace(err) } @@ -546,14 +569,30 @@ func RunStreamStart( } defer streamMgr.close() - if err = streamMgr.checkStreamStartEnable(g); err != nil { + if err = streamMgr.checkStreamStartEnable(ctx); err != nil { return errors.Trace(err) } if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { return errors.Trace(err) } +<<<<<<< HEAD cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) +======= + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() + if err = streamMgr.checkImportTaskRunning(ctx, cli.Client); err != nil { + return errors.Trace(err) + } +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) // It supports single stream log task currently. if count, err := cli.GetTaskCount(ctx); err != nil { return errors.Trace(err) @@ -602,12 +641,12 @@ func RunStreamStart( if err = streamMgr.setLock(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.backupFullSchemas(ctx, g); err != nil { + if err = streamMgr.backupFullSchemas(ctx); err != nil { return errors.Trace(err) } } - ranges, err := streamMgr.buildObserveRanges(ctx) + ranges, err := streamMgr.buildObserveRanges() if err != nil { return errors.Trace(err) } else if len(ranges) == 0 { @@ -695,7 +734,16 @@ func RunStreamStop( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, err := cli.GetTask(ctx, cfg.TaskName) if err != nil { @@ -745,7 +793,16 @@ func RunStreamPause( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName) if err != nil { @@ -803,7 +860,16 @@ func RunStreamResume( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName) if err != nil { @@ -931,6 +997,11 @@ func RunStreamStatus( if err != nil { return err } + defer func() { + if closeErr := ctl.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() return ctl.PrintStatusOfTask(ctx, cfg.TaskName) } @@ -1103,8 +1174,16 @@ func RunStreamRestore( defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } +<<<<<<< HEAD logInfo, err := getLogRange(ctx, &cfg.Config) +======= + _, s, err := GetStorage(ctx, cfg.Config.Storage, &cfg.Config) + if err != nil { + return errors.Trace(err) + } + logInfo, err := getLogRangeWithStorage(ctx, s) +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) if err != nil { return errors.Trace(err) } @@ -1156,7 +1235,11 @@ func RunStreamRestore( } // restore log. cfg.adjustRestoreConfigForStreamRestore() +<<<<<<< HEAD if err := restoreStream(ctx, g, cfg, logInfo.logMinTS, logInfo.logMaxTS); err != nil { +======= + if err := restoreStream(ctx, g, cfg, curTaskInfo); err != nil { +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) return errors.Trace(err) } return nil @@ -1167,7 +1250,11 @@ func restoreStream( c context.Context, g glue.Glue, cfg *RestoreConfig, +<<<<<<< HEAD logMinTS, logMaxTS uint64, +======= + taskInfo *checkpoint.CheckpointTaskInfoForLogRestore, +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) ) (err error) { var ( totalKVCount uint64 @@ -1247,8 +1334,13 @@ func restoreStream( return err } +<<<<<<< HEAD // get full backup meta to generate rewrite rules. fullBackupTables, err := initFullBackupTables(ctx, cfg) +======= + // get full backup meta storage to generate rewrite rules. + fullBackupStorage, err := parseFullBackupTablesStorage(cfg) +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) if err != nil { return errors.Trace(err) } @@ -1408,15 +1500,6 @@ func withProgress(p glue.Progress, cc func(p glue.Progress) error) error { return cc(p) } -// nolint: unused, deadcode -func countIndices(ts map[int64]*metautil.Table) int64 { - result := int64(0) - for _, t := range ts { - result += int64(len(t.Info.Indices)) - } - return result -} - type backupLogInfo struct { logMaxTS uint64 logMinTS uint64 @@ -1432,7 +1515,17 @@ func getLogRange( if err != nil { return backupLogInfo{}, errors.Trace(err) } +<<<<<<< HEAD + +======= + return getLogRangeWithStorage(ctx, s) +} +func getLogRangeWithStorage( + ctx context.Context, + s storage.ExternalStorage, +) (backupLogInfo, error) { +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) // logStartTS: Get log start ts from backupmeta file. metaData, err := s.ReadFile(ctx, metautil.MetaFile) if err != nil { @@ -1513,8 +1606,12 @@ func getFullBackupTS( return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil } +<<<<<<< HEAD func initFullBackupTables( ctx context.Context, +======= +func parseFullBackupTablesStorage( +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) cfg *RestoreConfig, ) (map[int64]*metautil.Table, error) { var storage string @@ -1653,6 +1750,7 @@ func buildPauseSafePointName(taskName string) string { return fmt.Sprintf("%s_pause_safepoint", taskName) } +<<<<<<< HEAD func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) error { mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.StreamVersionChecker) @@ -1661,6 +1759,9 @@ func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) } defer mgr.Close() +======= +func checkPiTRRequirements(mgr *conn.Mgr) error { +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) userDBs := restore.GetExistedUserDBs(mgr.GetDomain()) if len(userDBs) > 0 { userDBNames := make([]string, 0, len(userDBs)) @@ -1674,3 +1775,98 @@ func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) return nil } +<<<<<<< HEAD +======= + +func checkPiTRTaskInfo( + ctx context.Context, + g glue.Glue, + s storage.ExternalStorage, + cfg *RestoreConfig, +) (*checkpoint.CheckpointTaskInfoForLogRestore, bool, error) { + var ( + doFullRestore = (len(cfg.FullBackupStorage) > 0) + + curTaskInfo *checkpoint.CheckpointTaskInfoForLogRestore + + errTaskMsg string + ) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), + cfg.CheckRequirements, true, conn.StreamVersionChecker) + if err != nil { + return nil, false, errors.Trace(err) + } + defer mgr.Close() + + clusterID := mgr.GetPDClient().GetClusterID(ctx) + if cfg.UseCheckpoint { + exists, err := checkpoint.ExistsCheckpointTaskInfo(ctx, s, clusterID) + if err != nil { + return nil, false, errors.Trace(err) + } + if exists { + curTaskInfo, err = checkpoint.LoadCheckpointTaskInfoForLogRestore(ctx, s, clusterID) + if err != nil { + return nil, false, errors.Trace(err) + } + // TODO: check whether user has manually modified the cluster(ddl). If so, regard the behavior + // as restore from scratch. (update `curTaskInfo.RewriteTs` to 0 as an uninitial value) + + // The task info is written to external storage without status `InSnapshotRestore` only when + // id-maps is persist into external storage, so there is no need to do snapshot restore again. + if curTaskInfo.StartTS == cfg.StartTS && curTaskInfo.RestoreTS == cfg.RestoreTS { + // the same task, check whether skip snapshot restore + doFullRestore = doFullRestore && (curTaskInfo.Progress == checkpoint.InSnapshotRestore) + // update the snapshot restore task name to clean up in final + if !doFullRestore && (len(cfg.FullBackupStorage) > 0) { + _ = cfg.generateSnapshotRestoreTaskName(clusterID) + } + log.Info("the same task", zap.Bool("skip-snapshot-restore", !doFullRestore)) + } else { + // not the same task, so overwrite the taskInfo with a new task + log.Info("not the same task, start to restore from scratch") + errTaskMsg = fmt.Sprintf( + "a new task [start-ts=%d] [restored-ts=%d] while the last task info: [start-ts=%d] [restored-ts=%d] [skip-snapshot-restore=%t]", + cfg.StartTS, cfg.RestoreTS, curTaskInfo.StartTS, curTaskInfo.RestoreTS, curTaskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersist) + + curTaskInfo = nil + } + } + } + + // restore full snapshot precheck. + if doFullRestore { + if !(cfg.UseCheckpoint && curTaskInfo != nil) { + // Only when use checkpoint and not the first execution, + // skip checking requirements. + log.Info("check pitr requirements for the first execution") + if err := checkPiTRRequirements(mgr); err != nil { + if len(errTaskMsg) > 0 { + err = errors.Annotatef(err, "The current restore task is regarded as %s. "+ + "If you ensure that no changes have been made to the cluster since the last execution, "+ + "you can adjust the `start-ts` or `restored-ts` to continue with the previous execution. "+ + "Otherwise, if you want to restore from scratch, please clean the cluster at first", errTaskMsg) + } + return nil, false, errors.Trace(err) + } + } + } + + // persist the new task info + if cfg.UseCheckpoint && curTaskInfo == nil { + log.Info("save checkpoint task info with `InSnapshotRestore` status") + if err := checkpoint.SaveCheckpointTaskInfoForLogRestore(ctx, s, &checkpoint.CheckpointTaskInfoForLogRestore{ + Progress: checkpoint.InSnapshotRestore, + StartTS: cfg.StartTS, + RestoreTS: cfg.RestoreTS, + // updated in the stage of `InLogRestoreAndIdMapPersist` + RewriteTS: 0, + TiFlashItems: nil, + }, clusterID); err != nil { + return nil, false, errors.Trace(err) + } + } + + return curTaskInfo, doFullRestore, nil +} +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index c06a452711b9d..fc147264cb9ac 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -90,7 +90,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 33, + shard_count = 32, deps = [ "//br/pkg/errors", "//br/pkg/metautil", diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 701379f5aa67b..d7377ee180971 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -4,8 +4,12 @@ package utils import ( "context" +<<<<<<< HEAD "database/sql" "strings" +======= + "strconv" +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) "github.com/pingcap/errors" "github.com/pingcap/log" @@ -28,6 +32,7 @@ var ( logBackupTaskCount = atomic.NewInt32(0) ) +<<<<<<< HEAD // QueryExecutor is a interface for exec query type QueryExecutor interface { QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) @@ -96,6 +101,72 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { } } return true, nil +======= +func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64) { + return GetSplitSize(ctx), GetSplitKeys(ctx) +} + +func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64 { + const defaultSplitSize = 96 * 1024 * 1024 + varStr := "show config where name = 'coprocessor.region-split-size' and type = 'tikv'" + rows, fields, err := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + varStr, + ) + if err != nil { + log.Warn("failed to get split size, use default value", logutil.ShortError(err)) + return defaultSplitSize + } + if len(rows) == 0 { + // use the default value + return defaultSplitSize + } + + d := rows[0].GetDatum(3, &fields[3].Column.FieldType) + splitSizeStr, err := d.ToString() + if err != nil { + log.Warn("failed to get split size, use default value", logutil.ShortError(err)) + return defaultSplitSize + } + splitSize, err := units.FromHumanSize(splitSizeStr) + if err != nil { + log.Warn("failed to get split size, use default value", logutil.ShortError(err)) + return defaultSplitSize + } + return uint64(splitSize) +} + +func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64 { + const defaultSplitKeys = 960000 + varStr := "show config where name = 'coprocessor.region-split-keys' and type = 'tikv'" + rows, fields, err := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + varStr, + ) + if err != nil { + log.Warn("failed to get split keys, use default value", logutil.ShortError(err)) + return defaultSplitKeys + } + if len(rows) == 0 { + // use the default value + return defaultSplitKeys + } + + d := rows[0].GetDatum(3, &fields[3].Column.FieldType) + splitKeysStr, err := d.ToString() + if err != nil { + log.Warn("failed to get split keys, use default value", logutil.ShortError(err)) + return defaultSplitKeys + } + splitKeys, err := strconv.ParseInt(splitKeysStr, 10, 64) + if err != nil { + log.Warn("failed to get split keys, use default value", logutil.ShortError(err)) + return defaultSplitKeys + } + return splitKeys +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) } func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error) { diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index 1334d868641f0..ba16ae76c0c5f 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -51,76 +51,6 @@ func (m *mockRestrictedSQLExecutor) ExecRestrictedSQL(ctx context.Context, opts return nil, nil, nil } -func TestIsLogBackupEnabled(t *testing.T) { - // config format: - // MySQL [(none)]> show config where name="log-backup.enable"; - // +------+-----------------+-------------------+-------+ - // | Type | Instance | Name | Value | - // +------+-----------------+-------------------+-------+ - // | tikv | 127.0.0.1:20161 | log-backup.enable | false | - // | tikv | 127.0.0.1:20162 | log-backup.enable | false | - // | tikv | 127.0.0.1:20160 | log-backup.enable | false | - // +------+-----------------+-------------------+-------+ - fields := make([]*ast.ResultField, 4) - tps := []*types.FieldType{ - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - } - for i := 0; i < len(tps); i++ { - rf := new(ast.ResultField) - rf.Column = new(model.ColumnInfo) - rf.Column.FieldType = *tps[i] - fields[i] = rf - } - rows := make([]chunk.Row, 0, 1) - - // case 1: non of tikvs enabled log-backup expected false - // tikv | 127.0.0.1:20161 | log-backup.enable | false | - row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() - rows = append(rows, row) - s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err := utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.False(t, enabled) - - // case 2: one of tikvs enabled log-backup expected false - // tikv | 127.0.0.1:20161 | log-backup.enable | false | - // tikv | 127.0.0.1:20162 | log-backup.enable | true | - rows = nil - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err = utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.False(t, enabled) - - // case 3: all of tikvs enabled log-backup expected true - // tikv | 127.0.0.1:20161 | log-backup.enable | true | - // tikv | 127.0.0.1:20162 | log-backup.enable | true | - // tikv | 127.0.0.1:20163 | log-backup.enable | true | - rows = nil - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20163", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err = utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.True(t, enabled) - - // case 4: met error and expected false. - s = &mockRestrictedSQLExecutor{errHappen: true} - enabled, err = utils.IsLogBackupEnabled(s) - require.Error(t, err) - require.False(t, enabled) -} - func TestCheckLogBackupTaskExist(t *testing.T) { require.False(t, utils.CheckLogBackupTaskExist()) utils.LogBackupTaskCountInc()