From bdcafcd3d118f2929defcfda78cab92cd524d9e8 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Thu, 25 Apr 2024 11:00:11 +0800 Subject: [PATCH] This is an automated cherry-pick of #52127 Signed-off-by: ti-chi-bot --- br/pkg/config/kv.go | 74 +++++++++++++++ br/pkg/conn/BUILD.bazel | 2 +- br/pkg/conn/conn.go | 19 ++++ br/pkg/conn/conn_test.go | 163 +++++++++++++++++++++++++++++++++ br/pkg/stream/stream_status.go | 9 ++ br/pkg/task/stream.go | 100 +++++++++++++------- br/pkg/utils/BUILD.bazel | 4 + br/pkg/utils/db.go | 4 +- br/pkg/utils/db_test.go | 70 -------------- 9 files changed, 340 insertions(+), 105 deletions(-) create mode 100644 br/pkg/config/kv.go diff --git a/br/pkg/config/kv.go b/br/pkg/config/kv.go new file mode 100644 index 0000000000000..e211887099854 --- /dev/null +++ b/br/pkg/config/kv.go @@ -0,0 +1,74 @@ +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. +package config + +import ( + "encoding/json" + + "github.com/docker/go-units" +) + +type ConfigTerm[T uint | uint64] struct { + Value T + Modified bool +} + +type KVConfig struct { + ImportGoroutines ConfigTerm[uint] + MergeRegionSize ConfigTerm[uint64] + MergeRegionKeyCount ConfigTerm[uint64] +} + +func ParseImportThreadsFromConfig(resp []byte) (uint, error) { + type importer struct { + Threads uint `json:"num-threads"` + } + + type config struct { + Import importer `json:"import"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, e + } + + return c.Import.Threads, nil +} + +func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) { + type coprocessor struct { + RegionSplitSize string `json:"region-split-size"` + RegionSplitKeys uint64 `json:"region-split-keys"` + } + + type config struct { + Cop coprocessor `json:"coprocessor"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return 0, 0, e + } + rs, e := units.RAMInBytes(c.Cop.RegionSplitSize) + if e != nil { + return 0, 0, e + } + urs := uint64(rs) + return urs, c.Cop.RegionSplitKeys, nil +} + +func ParseLogBackupEnableFromConfig(resp []byte) (bool, error) { + type logbackup struct { + Enable bool `json:"enable"` + } + + type config struct { + LogBackup logbackup `json:"log-backup"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return false, e + } + return c.LogBackup.Enable, nil +} diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index db514b6112065..89c35240cecf7 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -45,7 +45,7 @@ go_test( ], embed = [":conn"], flaky = True, - shard_count = 7, + shard_count = 8, deps = [ "//br/pkg/conn/util", "//br/pkg/pdutil", diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index b9eb643bc947e..8548840404056 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -329,6 +329,25 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli return regionSplitSize, regionSplitKeys } +// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. +func (mgr *Mgr) IsLogBackupEnabled(ctx context.Context, client *http.Client) (bool, error) { + logbackupEnable := true + err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + enable, err := kvconfig.ParseLogBackupEnableFromConfig(respBytes) + if err != nil { + log.Warn("Failed to parse log-backup enable from config", logutil.ShortError(err)) + return err + } + logbackupEnable = logbackupEnable && enable + return nil + }) + return logbackupEnable, errors.Trace(err) +} + // GetConfigFromTiKV get configs from all alive tikv stores. func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error { allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 60d15006d2971..bbe7bce8c8552 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -438,6 +438,169 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { } } +func TestIsLogBackupEnabled(t *testing.T) { + cases := []struct { + stores []*metapb.Store + content []string + enable bool + err bool + }{ + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + }, + content: []string{""}, + enable: true, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "", + // Assuming the TiKV has failed due to some reason. + "", + }, + enable: false, + err: true, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", + }, + enable: true, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", + }, + enable: false, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", + }, + enable: false, + err: false, + }, + } + + pctx := context.Background() + for _, ca := range cases { + ctx, cancel := context.WithCancel(pctx) + pdCli := utils.FakePDClient{Stores: ca.stores} + require.Equal(t, len(ca.content), len(ca.stores)) + count := 0 + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/config": + if len(ca.content[count]) == 0 { + cancel() + } + _, _ = fmt.Fprint(w, ca.content[count]) + default: + http.NotFoundHandler().ServeHTTP(w, r) + } + count++ + })) + + for _, s := range ca.stores { + s.Address = mockServer.URL + s.StatusAddress = mockServer.URL + } + + httpCli := mockServer.Client() + mgr := &Mgr{PdController: &pdutil.PdController{}} + mgr.PdController.SetPDClient(pdCli) + enable, err := mgr.IsLogBackupEnabled(ctx, httpCli) + if ca.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, ca.enable, enable) + } + mockServer.Close() + } +} + func TestHandleTiKVAddress(t *testing.T) { cases := []struct { store *metapb.Store diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index 57b803d93d46d..f029ddf588e6d 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -344,6 +344,15 @@ func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrin } } +func (ctl *StatusController) Close() error { + if ctl.meta != nil { + if err := ctl.meta.Close(); err != nil { + return errors.Trace(err) + } + } + return nil +} + // fillTask queries and fills the extra information for a raw task. func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatus, error) { var err error diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 0045047713483..2fd3e59449e48 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -298,7 +298,7 @@ type streamMgr struct { func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) { mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, true, conn.StreamVersionChecker) + cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { return nil, errors.Trace(err) } @@ -375,8 +375,8 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error { } // checkImportTaskRunning checks whether there is any import task running. -func (s *streamMgr) checkImportTaskRunning(ctx context.Context) error { - list, err := utils.GetImportTasksFrom(ctx, s.mgr.GetDomain().GetEtcdClient()) +func (s *streamMgr) checkImportTaskRunning(ctx context.Context, etcdCLI *clientv3.Client) error { + list, err := utils.GetImportTasksFrom(ctx, etcdCLI) if err != nil { return errors.Trace(err) } @@ -406,7 +406,7 @@ func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePo return nil } -func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, error) { +func (s *streamMgr) buildObserveRanges() ([]kv.KeyRange, error) { dRanges, err := stream.BuildObserveDataRanges( s.mgr.GetStorage(), s.cfg.FilterStr, @@ -426,13 +426,13 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro return rs, nil } -func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { +func (s *streamMgr) backupFullSchemas(ctx context.Context) error { clusterVersion, err := s.mgr.GetClusterVersion(ctx) if err != nil { return errors.Trace(err) } - metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil) + metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, true, metautil.MetaFile, nil) metaWriter.Update(func(m *backuppb.BackupMeta) { // save log startTS to backupmeta file m.StartVersion = s.cfg.StartTS @@ -458,6 +458,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { return nil } +<<<<<<< HEAD func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { se, err := g.CreateSession(s.mgr.GetStorage()) if err != nil { @@ -465,6 +466,10 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { } execCtx := se.GetSessionCtx().(sqlexec.RestrictedSQLExecutor) supportStream, err := utils.IsLogBackupEnabled(execCtx) +======= +func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error { + supportStream, err := s.mgr.IsLogBackupEnabled(ctx, s.httpCli) +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) if err != nil { return errors.Trace(err) } @@ -559,17 +564,26 @@ func RunStreamStart( } defer streamMgr.close() - if err = streamMgr.checkStreamStartEnable(g); err != nil { + if err = streamMgr.checkStreamStartEnable(ctx); err != nil { return errors.Trace(err) } if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.checkImportTaskRunning(ctx); err != nil { + + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() + if err = streamMgr.checkImportTaskRunning(ctx, cli.Client); err != nil { return errors.Trace(err) } - - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) // It supports single stream log task currently. if count, err := cli.GetTaskCount(ctx); err != nil { return errors.Trace(err) @@ -618,12 +632,12 @@ func RunStreamStart( if err = streamMgr.setLock(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.backupFullSchemas(ctx, g); err != nil { + if err = streamMgr.backupFullSchemas(ctx); err != nil { return errors.Trace(err) } } - ranges, err := streamMgr.buildObserveRanges(ctx) + ranges, err := streamMgr.buildObserveRanges() if err != nil { return errors.Trace(err) } else if len(ranges) == 0 { @@ -711,7 +725,16 @@ func RunStreamStop( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, err := cli.GetTask(ctx, cfg.TaskName) if err != nil { @@ -761,7 +784,16 @@ func RunStreamPause( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName) if err != nil { @@ -819,7 +851,16 @@ func RunStreamResume( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName) if err != nil { @@ -947,6 +988,11 @@ func RunStreamStatus( if err != nil { return err } + defer func() { + if closeErr := ctl.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() return ctl.PrintStatusOfTask(ctx, cfg.TaskName) } @@ -1114,7 +1160,7 @@ func RunStreamRestore( if err != nil { return errors.Trace(err) } - logInfo, err := getLogRangeWithStorage(ctx, &cfg.Config, s) + logInfo, err := getLogRangeWithStorage(ctx, s) if err != nil { return errors.Trace(err) } @@ -1184,7 +1230,7 @@ func RunStreamRestore( } // restore log. cfg.adjustRestoreConfigForStreamRestore() - if err := restoreStream(ctx, g, cfg, curTaskInfo, logInfo.logMinTS, logInfo.logMaxTS); err != nil { + if err := restoreStream(ctx, g, cfg, curTaskInfo); err != nil { return errors.Trace(err) } return nil @@ -1196,7 +1242,6 @@ func restoreStream( g glue.Glue, cfg *RestoreConfig, taskInfo *checkpoint.CheckpointTaskInfoForLogRestore, - logMinTS, logMaxTS uint64, ) (err error) { var ( totalKVCount uint64 @@ -1317,7 +1362,7 @@ func restoreStream( } // get full backup meta storage to generate rewrite rules. - fullBackupStorage, err := parseFullBackupTablesStorage(ctx, cfg) + fullBackupStorage, err := parseFullBackupTablesStorage(cfg) if err != nil { return errors.Trace(err) } @@ -1528,15 +1573,6 @@ func withProgress(p glue.Progress, cc func(p glue.Progress) error) error { return cc(p) } -// nolint: unused, deadcode -func countIndices(ts map[int64]*metautil.Table) int64 { - result := int64(0) - for _, t := range ts { - result += int64(len(t.Info.Indices)) - } - return result -} - type backupLogInfo struct { logMaxTS uint64 logMinTS uint64 @@ -1552,12 +1588,11 @@ func getLogRange( if err != nil { return backupLogInfo{}, errors.Trace(err) } - return getLogRangeWithStorage(ctx, cfg, s) + return getLogRangeWithStorage(ctx, s) } func getLogRangeWithStorage( ctx context.Context, - cfg *Config, s storage.ExternalStorage, ) (backupLogInfo, error) { // logStartTS: Get log start ts from backupmeta file. @@ -1641,7 +1676,6 @@ func getFullBackupTS( } func parseFullBackupTablesStorage( - ctx context.Context, cfg *RestoreConfig, ) (*restore.FullBackupStorageConfig, error) { var storageName string @@ -1730,7 +1764,7 @@ func buildPauseSafePointName(taskName string) string { return fmt.Sprintf("%s_pause_safepoint", taskName) } -func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *conn.Mgr) error { +func checkPiTRRequirements(mgr *conn.Mgr) error { userDBs := restore.GetExistedUserDBs(mgr.GetDomain()) if len(userDBs) > 0 { userDBNames := make([]string, 0, len(userDBs)) @@ -1807,7 +1841,7 @@ func checkPiTRTaskInfo( // Only when use checkpoint and not the first execution, // skip checking requirements. log.Info("check pitr requirements for the first execution") - if err := checkPiTRRequirements(ctx, g, cfg, mgr); err != nil { + if err := checkPiTRRequirements(mgr); err != nil { if len(errTaskMsg) > 0 { err = errors.Annotatef(err, "The current restore task is regarded as %s. "+ "If you ensure that no changes have been made to the cluster since the last execution, "+ diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 31685ef6e1b66..b120478f4fcf4 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -92,7 +92,11 @@ go_test( ], embed = [":utils"], flaky = True, +<<<<<<< HEAD shard_count = 38, +======= + shard_count = 32, +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) deps = [ "//br/pkg/errors", "//br/pkg/metautil", diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index d1a722591a2ce..aa8b23d7f2d48 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -6,7 +6,6 @@ import ( "context" "database/sql" "strconv" - "strings" "github.com/docker/go-units" "github.com/pingcap/errors" @@ -31,6 +30,7 @@ var ( logBackupTaskCount = atomic.NewInt32(0) ) +<<<<<<< HEAD // QueryExecutor is a interface for exec query type QueryExecutor interface { QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) @@ -101,6 +101,8 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { return true, nil } +======= +>>>>>>> 8973dddc9ed (br: no domain to run log command (#52127)) func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64) { return GetSplitSize(ctx), GetSplitKeys(ctx) } diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index 1004764b0d206..6daf48e89e7ed 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -51,76 +51,6 @@ func (m *mockRestrictedSQLExecutor) ExecRestrictedSQL(ctx context.Context, opts return nil, nil, nil } -func TestIsLogBackupEnabled(t *testing.T) { - // config format: - // MySQL [(none)]> show config where name="log-backup.enable"; - // +------+-----------------+-------------------+-------+ - // | Type | Instance | Name | Value | - // +------+-----------------+-------------------+-------+ - // | tikv | 127.0.0.1:20161 | log-backup.enable | false | - // | tikv | 127.0.0.1:20162 | log-backup.enable | false | - // | tikv | 127.0.0.1:20160 | log-backup.enable | false | - // +------+-----------------+-------------------+-------+ - fields := make([]*ast.ResultField, 4) - tps := []*types.FieldType{ - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - } - for i := 0; i < len(tps); i++ { - rf := new(ast.ResultField) - rf.Column = new(model.ColumnInfo) - rf.Column.FieldType = *tps[i] - fields[i] = rf - } - rows := make([]chunk.Row, 0, 1) - - // case 1: non of tikvs enabled log-backup expected false - // tikv | 127.0.0.1:20161 | log-backup.enable | false | - row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() - rows = append(rows, row) - s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err := utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.False(t, enabled) - - // case 2: one of tikvs enabled log-backup expected false - // tikv | 127.0.0.1:20161 | log-backup.enable | false | - // tikv | 127.0.0.1:20162 | log-backup.enable | true | - rows = nil - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err = utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.False(t, enabled) - - // case 3: all of tikvs enabled log-backup expected true - // tikv | 127.0.0.1:20161 | log-backup.enable | true | - // tikv | 127.0.0.1:20162 | log-backup.enable | true | - // tikv | 127.0.0.1:20163 | log-backup.enable | true | - rows = nil - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20163", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err = utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.True(t, enabled) - - // case 4: met error and expected false. - s = &mockRestrictedSQLExecutor{errHappen: true} - enabled, err = utils.IsLogBackupEnabled(s) - require.Error(t, err) - require.False(t, enabled) -} - func TestCheckLogBackupTaskExist(t *testing.T) { require.False(t, utils.CheckLogBackupTaskExist()) utils.LogBackupTaskCountInc()