diff --git a/dumpling/export/BUILD.bazel b/dumpling/export/BUILD.bazel index 08cc7fe2e664e..fc3006cb855c0 100644 --- a/dumpling/export/BUILD.bazel +++ b/dumpling/export/BUILD.bazel @@ -106,6 +106,7 @@ go_test( "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_prometheus_client_golang//prometheus/collectors", "@com_github_stretchr_testify//require", "@org_golang_x_sync//errgroup", diff --git a/dumpling/export/dump.go b/dumpling/export/dump.go index 7e5a81e0f3ce1..b5f5a00af1974 100644 --- a/dumpling/export/dump.go +++ b/dumpling/export/dump.go @@ -1531,7 +1531,7 @@ func setSessionParam(d *Dumper) error { d.L().Info("cannot check whether TiDB has TiKV, will apply tidb_snapshot by default. This won't affect dump process", log.ShortError(err)) } if conf.ServerInfo.HasTiKV { - sessionParam["tidb_snapshot"] = snapshot + sessionParam[snapshotVar] = snapshot } } } diff --git a/dumpling/export/dump_test.go b/dumpling/export/dump_test.go index c9a40bba28d6f..7cbc52e341451 100644 --- a/dumpling/export/dump_test.go +++ b/dumpling/export/dump_test.go @@ -9,7 +9,9 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/version" tcontext "github.com/pingcap/tidb/dumpling/context" "github.com/pingcap/tidb/parser" @@ -224,3 +226,64 @@ func TestUnregisterMetrics(t *testing.T) { // should not panic require.Error(t, err) } + +func TestSetSessionParams(t *testing.T) { + // case 1: fail to set tidb_snapshot, should return error with hint + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + mock.ExpectQuery("SELECT @@tidb_config"). + WillReturnError(errors.New("mock error")) + mock.ExpectQuery("SELECT COUNT\\(1\\) as c FROM MYSQL.TiDB WHERE VARIABLE_NAME='tikv_gc_safe_point'"). + WillReturnError(errors.New("mock error")) + tikvErr := &mysql.MySQLError{ + Number: 1105, + Message: "can not get 'tikv_gc_safe_point'", + } + mock.ExpectExec("SET SESSION tidb_snapshot"). + WillReturnError(tikvErr) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/dumpling/export/SkipResetDB", "return(true)")) + defer failpoint.Disable("github.com/pingcap/tidb/dumpling/export/SkipResetDB=return(true)") + + tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + defer cancel() + + conf := DefaultConfig() + conf.ServerInfo = version.ServerInfo{ + ServerType: version.ServerTypeTiDB, + HasTiKV: false, + } + conf.Snapshot = "439153276059648000" + conf.Consistency = ConsistencyTypeSnapshot + d := &Dumper{ + tctx: tctx, + conf: conf, + cancelCtx: cancel, + dbHandle: db, + } + err = setSessionParam(d) + require.ErrorContains(t, err, "consistency=none") + + // case 2: fail to set other + conf.ServerInfo = version.ServerInfo{ + ServerType: version.ServerTypeMySQL, + HasTiKV: false, + } + conf.Snapshot = "" + conf.Consistency = ConsistencyTypeFlush + conf.SessionParams = map[string]interface{}{ + "mock": "UTC", + } + d.dbHandle = db + mock.ExpectExec("SET SESSION mock"). + WillReturnError(errors.New("Unknown system variable mock")) + mock.ExpectClose() + mock.ExpectClose() + + err = setSessionParam(d) + require.NoError(t, err) +} diff --git a/dumpling/export/sql.go b/dumpling/export/sql.go index 837bec568b9a7..60d14ac49e14c 100644 --- a/dumpling/export/sql.go +++ b/dumpling/export/sql.go @@ -29,6 +29,7 @@ import ( const ( orderByTiDBRowID = "ORDER BY `_tidb_rowid`" + snapshotVar = "tidb_snapshot" ) type listTableType int @@ -851,7 +852,9 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, cfg *mysql.Con s := fmt.Sprintf("SET SESSION %s = ?", k) _, err := db.ExecContext(tctx, s, pv) if err != nil { - if isUnknownSystemVariableErr(err) { + if k == snapshotVar { + err = errors.Annotate(err, "fail to set snapshot for tidb, please set --consistency=none/--consistency=lock or fix snapshot problem") + } else if isUnknownSystemVariableErr(err) { tctx.L().Info("session variable is not supported by db", zap.String("variable", k), zap.Reflect("value", v)) continue } @@ -876,6 +879,9 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, cfg *mysql.Con } cfg.Params[k] = s } + failpoint.Inject("SkipResetDB", func(_ failpoint.Value) { + failpoint.Return(db, nil) + }) db.Close() c, err := mysql.NewConnector(cfg)