From 7087c28aead2b1459d2d6a2d82463100ab9af25a Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 27 Mar 2024 10:24:10 +0800 Subject: [PATCH 1/6] no domain to run log command Signed-off-by: Leavrth --- br/pkg/stream/stream_status.go | 9 ++++ br/pkg/task/stream.go | 83 +++++++++++++++++++++++----------- 2 files changed, 65 insertions(+), 27 deletions(-) diff --git a/br/pkg/stream/stream_status.go b/br/pkg/stream/stream_status.go index 7090e0622fabc..186567cc86f6c 100644 --- a/br/pkg/stream/stream_status.go +++ b/br/pkg/stream/stream_status.go @@ -344,6 +344,15 @@ func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrin } } +func (ctl *StatusController) Close() error { + if ctl.meta != nil { + if err := ctl.meta.Close(); err != nil { + return errors.Trace(err) + } + } + return nil +} + // fillTask queries and fills the extra information for a raw task. func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatus, error) { var err error diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 6cdc2c47f486c..26315b77b6642 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -292,7 +292,7 @@ type streamMgr struct { func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) { mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, true, conn.StreamVersionChecker) + cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { return nil, errors.Trace(err) } @@ -401,7 +401,7 @@ func (s *streamMgr) setGCSafePoint(ctx context.Context, sp utils.BRServiceSafePo return nil } -func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, error) { +func (s *streamMgr) buildObserveRanges() ([]kv.KeyRange, error) { dRanges, err := stream.BuildObserveDataRanges( s.mgr.GetStorage(), s.cfg.FilterStr, @@ -421,7 +421,7 @@ func (s *streamMgr) buildObserveRanges(ctx context.Context) ([]kv.KeyRange, erro return rs, nil } -func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { +func (s *streamMgr) backupFullSchemas(ctx context.Context) error { clusterVersion, err := s.mgr.GetClusterVersion(ctx) if err != nil { return errors.Trace(err) @@ -568,7 +568,16 @@ func RunStreamStart( return errors.Trace(err) } - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // It supports single stream log task currently. if count, err := cli.GetTaskCount(ctx); err != nil { return errors.Trace(err) @@ -617,12 +626,12 @@ func RunStreamStart( if err = streamMgr.setLock(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.backupFullSchemas(ctx, g); err != nil { + if err = streamMgr.backupFullSchemas(ctx); err != nil { return errors.Trace(err) } } - ranges, err := streamMgr.buildObserveRanges(ctx) + ranges, err := streamMgr.buildObserveRanges() if err != nil { return errors.Trace(err) } else if len(ranges) == 0 { @@ -710,7 +719,16 @@ func RunStreamStop( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, err := cli.GetTask(ctx, cfg.TaskName) if err != nil { @@ -760,7 +778,16 @@ func RunStreamPause( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName) if err != nil { @@ -818,7 +845,16 @@ func RunStreamResume( } defer streamMgr.close() - cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient()) + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + defer func() { + if closeErr := cli.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() // to add backoff ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName) if err != nil { @@ -946,6 +982,11 @@ func RunStreamStatus( if err != nil { return err } + defer func() { + if closeErr := ctl.Close(); closeErr != nil { + log.Warn("failed to close etcd client", zap.Error(closeErr)) + } + }() return ctl.PrintStatusOfTask(ctx, cfg.TaskName) } @@ -1112,7 +1153,7 @@ func RunStreamRestore( if err != nil { return errors.Trace(err) } - logInfo, err := getLogRangeWithStorage(ctx, &cfg.Config, s) + logInfo, err := getLogRangeWithStorage(ctx, s) if err != nil { return errors.Trace(err) } @@ -1182,7 +1223,7 @@ func RunStreamRestore( } // restore log. cfg.adjustRestoreConfigForStreamRestore() - if err := restoreStream(ctx, g, cfg, curTaskInfo, logInfo.logMinTS, logInfo.logMaxTS); err != nil { + if err := restoreStream(ctx, g, cfg, curTaskInfo); err != nil { return errors.Trace(err) } return nil @@ -1194,7 +1235,6 @@ func restoreStream( g glue.Glue, cfg *RestoreConfig, taskInfo *checkpoint.CheckpointTaskInfoForLogRestore, - logMinTS, logMaxTS uint64, ) (err error) { var ( totalKVCount uint64 @@ -1315,7 +1355,7 @@ func restoreStream( } // get full backup meta storage to generate rewrite rules. - fullBackupStorage, err := parseFullBackupTablesStorage(ctx, cfg) + fullBackupStorage, err := parseFullBackupTablesStorage(cfg) if err != nil { return errors.Trace(err) } @@ -1540,15 +1580,6 @@ func withProgress(p glue.Progress, cc func(p glue.Progress) error) error { return cc(p) } -// nolint: unused, deadcode -func countIndices(ts map[int64]*metautil.Table) int64 { - result := int64(0) - for _, t := range ts { - result += int64(len(t.Info.Indices)) - } - return result -} - type backupLogInfo struct { logMaxTS uint64 logMinTS uint64 @@ -1564,12 +1595,11 @@ func getLogRange( if err != nil { return backupLogInfo{}, errors.Trace(err) } - return getLogRangeWithStorage(ctx, cfg, s) + return getLogRangeWithStorage(ctx, s) } func getLogRangeWithStorage( ctx context.Context, - cfg *Config, s storage.ExternalStorage, ) (backupLogInfo, error) { // logStartTS: Get log start ts from backupmeta file. @@ -1653,7 +1683,6 @@ func getFullBackupTS( } func parseFullBackupTablesStorage( - ctx context.Context, cfg *RestoreConfig, ) (*restore.FullBackupStorageConfig, error) { var storageName string @@ -1742,7 +1771,7 @@ func buildPauseSafePointName(taskName string) string { return fmt.Sprintf("%s_pause_safepoint", taskName) } -func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *conn.Mgr) error { +func checkPiTRRequirements(mgr *conn.Mgr) error { userDBs := restore.GetExistedUserDBs(mgr.GetDomain()) if len(userDBs) > 0 { userDBNames := make([]string, 0, len(userDBs)) @@ -1819,7 +1848,7 @@ func checkPiTRTaskInfo( // Only when use checkpoint and not the first execution, // skip checking requirements. log.Info("check pitr requirements for the first execution") - if err := checkPiTRRequirements(ctx, g, cfg, mgr); err != nil { + if err := checkPiTRRequirements(mgr); err != nil { if len(errTaskMsg) > 0 { err = errors.Annotatef(err, "The current restore task is regarded as %s. "+ "If you ensure that no changes have been made to the cluster since the last execution, "+ From d4703faa124bec70554b833e954320353cc0eaf9 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 27 Mar 2024 15:10:00 +0800 Subject: [PATCH 2/6] fix panic Signed-off-by: Leavrth --- br/pkg/task/stream.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 26315b77b6642..fa66369ccd1d3 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -370,8 +370,8 @@ func (s *streamMgr) adjustAndCheckStartTS(ctx context.Context) error { } // checkImportTaskRunning checks whether there is any import task running. -func (s *streamMgr) checkImportTaskRunning(ctx context.Context) error { - list, err := utils.GetImportTasksFrom(ctx, s.mgr.GetDomain().GetEtcdClient()) +func (s *streamMgr) checkImportTaskRunning(ctx context.Context, etcdCLI *clientv3.Client) error { + list, err := utils.GetImportTasksFrom(ctx, etcdCLI) if err != nil { return errors.Trace(err) } @@ -427,7 +427,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context) error { return errors.Trace(err) } - metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, nil) + metaWriter := metautil.NewMetaWriter(s.bc.GetStorage(), metautil.MetaFileSize, true, metautil.MetaFile, nil) metaWriter.Update(func(m *backuppb.BackupMeta) { // save log startTS to backupmeta file m.StartVersion = s.cfg.StartTS @@ -564,14 +564,15 @@ func RunStreamStart( if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { return errors.Trace(err) } - if err = streamMgr.checkImportTaskRunning(ctx); err != nil { - return errors.Trace(err) - } etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) if err != nil { return errors.Trace(err) } + if err = streamMgr.checkImportTaskRunning(ctx, etcdCLI); err != nil { + return errors.Trace(err) + } + cli := streamhelper.NewMetaDataClient(etcdCLI) defer func() { if closeErr := cli.Close(); closeErr != nil { From df7ffb2ada48e942c10b57b643ae907d6b8d0f89 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Mon, 22 Apr 2024 16:35:53 +0800 Subject: [PATCH 3/6] handle etcd client close Signed-off-by: Jianjun Liao --- br/pkg/task/stream.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index fa66369ccd1d3..d01b5610ada77 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -569,16 +569,15 @@ func RunStreamStart( if err != nil { return errors.Trace(err) } - if err = streamMgr.checkImportTaskRunning(ctx, etcdCLI); err != nil { - return errors.Trace(err) - } - cli := streamhelper.NewMetaDataClient(etcdCLI) defer func() { if closeErr := cli.Close(); closeErr != nil { log.Warn("failed to close etcd client", zap.Error(closeErr)) } }() + if err = streamMgr.checkImportTaskRunning(ctx, cli.Client); err != nil { + return errors.Trace(err) + } // It supports single stream log task currently. if count, err := cli.GetTaskCount(ctx); err != nil { return errors.Trace(err) From b9f5974643fc3cfc5cb1aa8e8fcd61e46d31a5bb Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Tue, 23 Apr 2024 16:04:09 +0800 Subject: [PATCH 4/6] debug integration test Signed-off-by: Jianjun Liao --- br/tests/br_pitr_failpoint/run.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/tests/br_pitr_failpoint/run.sh b/br/tests/br_pitr_failpoint/run.sh index 8ce6d4b49b526..5c81643246d6d 100644 --- a/br/tests/br_pitr_failpoint/run.sh +++ b/br/tests/br_pitr_failpoint/run.sh @@ -54,6 +54,8 @@ while true; do retry_cnt=$((retry_cnt+1)) if [ "$retry_cnt" -gt 50 ]; then + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs';" + cat $res_file echo 'the wait lag is too large' exit 1 fi From 251072f2da49dfe81ed5a2a30638a5bfa54f3dc5 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 24 Apr 2024 11:07:57 +0800 Subject: [PATCH 5/6] use http client to get tikv config Signed-off-by: Jianjun Liao --- br/pkg/config/kv.go | 16 +++ br/pkg/conn/conn.go | 19 ++++ br/pkg/conn/conn_test.go | 163 ++++++++++++++++++++++++++++++ br/pkg/task/stream.go | 11 +- br/pkg/utils/db.go | 48 --------- br/pkg/utils/db_test.go | 70 ------------- br/tests/br_pitr_failpoint/run.sh | 2 - 7 files changed, 201 insertions(+), 128 deletions(-) diff --git a/br/pkg/config/kv.go b/br/pkg/config/kv.go index 0283c8d3cc721..e211887099854 100644 --- a/br/pkg/config/kv.go +++ b/br/pkg/config/kv.go @@ -56,3 +56,19 @@ func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) { urs := uint64(rs) return urs, c.Cop.RegionSplitKeys, nil } + +func ParseLogBackupEnableFromConfig(resp []byte) (bool, error) { + type logbackup struct { + Enable bool `json:"enable"` + } + + type config struct { + LogBackup logbackup `json:"log-backup"` + } + var c config + e := json.Unmarshal(resp, &c) + if e != nil { + return false, e + } + return c.LogBackup.Enable, nil +} diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 16c86e18c8985..87accda625a22 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -352,6 +352,25 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, } } +// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. +func (mgr *Mgr) IsLogBackupEnabled(ctx context.Context, client *http.Client) (bool, error) { + logbackupEnable := true + err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error { + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + enable, err := kvconfig.ParseLogBackupEnableFromConfig(respBytes) + if err != nil { + log.Warn("Failed to parse log-backup enable from config", logutil.ShortError(err)) + return err + } + logbackupEnable = logbackupEnable && enable + return nil + }) + return logbackupEnable, errors.Trace(err) +} + // GetConfigFromTiKV get configs from all alive tikv stores. func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error { allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 17b678f6c268f..00fe21d60f1e0 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -455,6 +455,169 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { } } +func TestIsLogBackupEnabled(t *testing.T) { + cases := []struct { + stores []*metapb.Store + content []string + enable bool + err bool + }{ + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + }, + content: []string{""}, + enable: true, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "", + // Assuming the TiKV has failed due to some reason. + "", + }, + enable: false, + err: true, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", + }, + enable: true, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", + }, + enable: false, + err: false, + }, + { + stores: []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tikv", + }, + }, + }, + }, + content: []string{ + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}", + "{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}", + }, + enable: false, + err: false, + }, + } + + pctx := context.Background() + for _, ca := range cases { + ctx, cancel := context.WithCancel(pctx) + pdCli := utils.FakePDClient{Stores: ca.stores} + require.Equal(t, len(ca.content), len(ca.stores)) + count := 0 + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case "/config": + if len(ca.content[count]) == 0 { + cancel() + } + _, _ = fmt.Fprint(w, ca.content[count]) + default: + http.NotFoundHandler().ServeHTTP(w, r) + } + count++ + })) + + for _, s := range ca.stores { + s.Address = mockServer.URL + s.StatusAddress = mockServer.URL + } + + httpCli := mockServer.Client() + mgr := &Mgr{PdController: &pdutil.PdController{}} + mgr.PdController.SetPDClient(pdCli) + enable, err := mgr.IsLogBackupEnabled(ctx, httpCli) + if ca.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, ca.enable, enable) + } + mockServer.Close() + } +} + func TestHandleTiKVAddress(t *testing.T) { cases := []struct { store *metapb.Store diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index f9497eb7ac997..5362fce37f19a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -454,13 +454,8 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context) error { return nil } -func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error { - se, err := g.CreateSession(s.mgr.GetStorage()) - if err != nil { - return errors.Trace(err) - } - execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor() - supportStream, err := utils.IsLogBackupEnabled(execCtx) +func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error { + supportStream, err := s.mgr.IsLogBackupEnabled(ctx, s.httpCli) if err != nil { return errors.Trace(err) } @@ -555,7 +550,7 @@ func RunStreamStart( } defer streamMgr.close() - if err = streamMgr.checkStreamStartEnable(g); err != nil { + if err = streamMgr.checkStreamStartEnable(ctx); err != nil { return errors.Trace(err) } if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index d9ee2cefeba1a..f6d0b66dc7a54 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -5,7 +5,6 @@ package utils import ( "context" "strconv" - "strings" "github.com/docker/go-units" "github.com/pingcap/errors" @@ -26,53 +25,6 @@ var ( logBackupTaskCount = atomic.NewInt32(0) ) -// CheckLogBackupEnabled checks if LogBackup is enabled in cluster. -// this mainly used in three places. -// 1. GC worker resolve locks to scan more locks after safepoint. (every minute) -// 2. Add index skipping use lightning.(every add index ddl) -// 3. Telemetry of log backup feature usage (every 6 hours). -// NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster. -func CheckLogBackupEnabled(ctx sessionctx.Context) bool { - executor := ctx.GetRestrictedSQLExecutor() - enabled, err := IsLogBackupEnabled(executor) - if err != nil { - // if it failed by any reason. we can simply return true this time. - // for GC worker it will scan more locks in one tick. - // for Add index it will skip using lightning this time. - // for Telemetry it will get a false positive usage count. - log.Warn("check log backup config failed, ignore it", zap.String("category", "backup"), zap.Error(err)) - return true - } - return enabled -} - -// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. -// we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. -// it should return error. -func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { - valStr := "show config where name = 'log-backup.enable' and type = 'tikv'" - internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) - rows, fields, errSQL := ctx.ExecRestrictedSQL(internalCtx, nil, valStr) - if errSQL != nil { - return false, errSQL - } - if len(rows) == 0 { - // no rows mean not support log backup. - return false, nil - } - for _, row := range rows { - d := row.GetDatum(3, &fields[3].Column.FieldType) - value, errField := d.ToString() - if errField != nil { - return false, errField - } - if strings.ToLower(value) == "false" { - return false, nil - } - } - return true, nil -} - func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64) { return GetSplitSize(ctx), GetSplitKeys(ctx) } diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index a43a8840c9cc4..a3d67a6a7667e 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -51,76 +51,6 @@ func (m *mockRestrictedSQLExecutor) ExecRestrictedSQL(ctx context.Context, opts return nil, nil, nil } -func TestIsLogBackupEnabled(t *testing.T) { - // config format: - // MySQL [(none)]> show config where name="log-backup.enable"; - // +------+-----------------+-------------------+-------+ - // | Type | Instance | Name | Value | - // +------+-----------------+-------------------+-------+ - // | tikv | 127.0.0.1:20161 | log-backup.enable | false | - // | tikv | 127.0.0.1:20162 | log-backup.enable | false | - // | tikv | 127.0.0.1:20160 | log-backup.enable | false | - // +------+-----------------+-------------------+-------+ - fields := make([]*ast.ResultField, 4) - tps := []*types.FieldType{ - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - types.NewFieldType(mysql.TypeString), - } - for i := 0; i < len(tps); i++ { - rf := new(ast.ResultField) - rf.Column = new(model.ColumnInfo) - rf.Column.FieldType = *tps[i] - fields[i] = rf - } - rows := make([]chunk.Row, 0, 1) - - // case 1: non of tikvs enabled log-backup expected false - // tikv | 127.0.0.1:20161 | log-backup.enable | false | - row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() - rows = append(rows, row) - s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err := utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.False(t, enabled) - - // case 2: one of tikvs enabled log-backup expected false - // tikv | 127.0.0.1:20161 | log-backup.enable | false | - // tikv | 127.0.0.1:20162 | log-backup.enable | true | - rows = nil - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "false").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err = utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.False(t, enabled) - - // case 3: all of tikvs enabled log-backup expected true - // tikv | 127.0.0.1:20161 | log-backup.enable | true | - // tikv | 127.0.0.1:20162 | log-backup.enable | true | - // tikv | 127.0.0.1:20163 | log-backup.enable | true | - rows = nil - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20163", "log-backup.enable", "true").ToRow() - rows = append(rows, row) - s = &mockRestrictedSQLExecutor{rows: rows, fields: fields} - enabled, err = utils.IsLogBackupEnabled(s) - require.NoError(t, err) - require.True(t, enabled) - - // case 4: met error and expected false. - s = &mockRestrictedSQLExecutor{errHappen: true} - enabled, err = utils.IsLogBackupEnabled(s) - require.Error(t, err) - require.False(t, enabled) -} - func TestCheckLogBackupTaskExist(t *testing.T) { require.False(t, utils.CheckLogBackupTaskExist()) utils.LogBackupTaskCountInc() diff --git a/br/tests/br_pitr_failpoint/run.sh b/br/tests/br_pitr_failpoint/run.sh index 5c81643246d6d..8ce6d4b49b526 100644 --- a/br/tests/br_pitr_failpoint/run.sh +++ b/br/tests/br_pitr_failpoint/run.sh @@ -54,8 +54,6 @@ while true; do retry_cnt=$((retry_cnt+1)) if [ "$retry_cnt" -gt 50 ]; then - run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs';" - cat $res_file echo 'the wait lag is too large' exit 1 fi From 9fe917b2c1bdb67660c939102a2cd878b26d6c99 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 24 Apr 2024 11:31:52 +0800 Subject: [PATCH 6/6] make bazel_prepare Signed-off-by: Jianjun Liao --- br/pkg/conn/BUILD.bazel | 2 +- br/pkg/utils/BUILD.bazel | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index 0c7dc388ada74..8f25055538375 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -46,7 +46,7 @@ go_test( ], embed = [":conn"], flaky = True, - shard_count = 7, + shard_count = 8, deps = [ "//br/pkg/config", "//br/pkg/conn/util", diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 0c24b46141179..edbbc4dcf2bb5 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -77,7 +77,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 33, + shard_count = 32, deps = [ "//br/pkg/errors", "//pkg/kv",