From e93f996743d6b62ef021bba2043607f46a3c720a Mon Sep 17 00:00:00 2001 From: kennytm Date: Tue, 4 Jun 2024 16:00:44 +0800 Subject: [PATCH] lightning: fix merge conflict Signed-off-by: kennytm --- br/pkg/lightning/common/BUILD.bazel | 4 - br/pkg/lightning/common/util.go | 195 ------------------ br/pkg/lightning/lightning.go | 5 - pkg/lightning/common/util_test.go | 304 ---------------------------- 4 files changed, 508 deletions(-) delete mode 100644 pkg/lightning/common/util_test.go diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 10f55207b996b..fad1cabebc2ac 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -95,11 +95,7 @@ go_test( ], embed = [":common"], flaky = True, -<<<<<<< HEAD:br/pkg/lightning/common/BUILD.bazel shard_count = 21, -======= - shard_count = 29, ->>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):pkg/lightning/common/BUILD.bazel deps = [ "//br/pkg/errors", "//br/pkg/lightning/log", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 7ed024bbcb83c..621c59d820e23 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -428,198 +428,3 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo { } return nil } -<<<<<<< HEAD:br/pkg/lightning/common/util.go -======= - -// GetDropIndexInfos returns the index infos that need to be dropped and the remain indexes. -func GetDropIndexInfos( - tblInfo *model.TableInfo, -) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo) { - cols := tblInfo.Columns -loop: - for _, idxInfo := range tblInfo.Indices { - if idxInfo.State != model.StatePublic { - remainIndexes = append(remainIndexes, idxInfo) - continue - } - // Primary key is a cluster index. - if idxInfo.Primary && tblInfo.HasClusteredIndex() { - remainIndexes = append(remainIndexes, idxInfo) - continue - } - // Skip index that contains auto-increment column. - // Because auto colum must be defined as a key. - for _, idxCol := range idxInfo.Columns { - flag := cols[idxCol.Offset].GetFlag() - if tmysql.HasAutoIncrementFlag(flag) { - remainIndexes = append(remainIndexes, idxInfo) - continue loop - } - } - dropIndexes = append(dropIndexes, idxInfo) - } - return remainIndexes, dropIndexes -} - -// BuildDropIndexSQL builds the SQL statement to drop index. -func BuildDropIndexSQL(dbName, tableName string, idxInfo *model.IndexInfo) string { - if idxInfo.Primary { - return SprintfWithIdentifiers("ALTER TABLE %s.%s DROP PRIMARY KEY", dbName, tableName) - } - return SprintfWithIdentifiers("ALTER TABLE %s.%s DROP INDEX %s", dbName, tableName, idxInfo.Name.O) -} - -// BuildAddIndexSQL builds the SQL statement to create missing indexes. -// It returns both a single SQL statement that creates all indexes at once, -// and a list of SQL statements that creates each index individually. -func BuildAddIndexSQL( - tableName string, - curTblInfo, - desiredTblInfo *model.TableInfo, -) (singleSQL string, multiSQLs []string) { - addIndexSpecs := make([]string, 0, len(desiredTblInfo.Indices)) -loop: - for _, desiredIdxInfo := range desiredTblInfo.Indices { - for _, curIdxInfo := range curTblInfo.Indices { - if curIdxInfo.Name.L == desiredIdxInfo.Name.L { - continue loop - } - } - - var buf bytes.Buffer - if desiredIdxInfo.Primary { - buf.WriteString("ADD PRIMARY KEY ") - } else if desiredIdxInfo.Unique { - buf.WriteString("ADD UNIQUE KEY ") - } else { - buf.WriteString("ADD KEY ") - } - // "primary" is a special name for primary key, we should not use it as index name. - if desiredIdxInfo.Name.L != "primary" { - buf.WriteString(EscapeIdentifier(desiredIdxInfo.Name.O)) - } - - colStrs := make([]string, 0, len(desiredIdxInfo.Columns)) - for _, col := range desiredIdxInfo.Columns { - var colStr string - if desiredTblInfo.Columns[col.Offset].Hidden { - colStr = fmt.Sprintf("(%s)", desiredTblInfo.Columns[col.Offset].GeneratedExprString) - } else { - colStr = EscapeIdentifier(col.Name.O) - if col.Length != types.UnspecifiedLength { - colStr = fmt.Sprintf("%s(%s)", colStr, strconv.Itoa(col.Length)) - } - } - colStrs = append(colStrs, colStr) - } - fmt.Fprintf(&buf, "(%s)", strings.Join(colStrs, ",")) - - if desiredIdxInfo.Invisible { - fmt.Fprint(&buf, " INVISIBLE") - } - if desiredIdxInfo.Comment != "" { - fmt.Fprintf(&buf, ` COMMENT '%s'`, format.OutputFormat(desiredIdxInfo.Comment)) - } - addIndexSpecs = append(addIndexSpecs, buf.String()) - } - if len(addIndexSpecs) == 0 { - return "", nil - } - - singleSQL = fmt.Sprintf("ALTER TABLE %s %s", tableName, strings.Join(addIndexSpecs, ", ")) - for _, spec := range addIndexSpecs { - multiSQLs = append(multiSQLs, fmt.Sprintf("ALTER TABLE %s %s", tableName, spec)) - } - return singleSQL, multiSQLs -} - -// IsDupKeyError checks if err is a duplicate index error. -func IsDupKeyError(err error) bool { - if merr, ok := errors.Cause(err).(*mysql.MySQLError); ok { - switch merr.Number { - case errno.ErrDupKeyName, errno.ErrMultiplePriKey, errno.ErrDupUnique: - return true - } - } - return false -} - -// GetBackoffWeightFromDB gets the backoff weight from database. -func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) { - val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight) - if err != nil { - return 0, err - } - return strconv.Atoi(val) -} - -// GetExplicitRequestSourceTypeFromDB gets the explicit request source type from database. -func GetExplicitRequestSourceTypeFromDB(ctx context.Context, db *sql.DB) (string, error) { - return getSessionVariable(ctx, db, variable.TiDBExplicitRequestSourceType) -} - -// copy from dbutil to avoid import cycle -func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) { - query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable) - rows, err := db.QueryContext(ctx, query) - - if err != nil { - return "", errors.Trace(err) - } - defer rows.Close() - - // Show an example. - /* - mysql> SHOW VARIABLES LIKE "binlog_format"; - +---------------+-------+ - | Variable_name | Value | - +---------------+-------+ - | binlog_format | ROW | - +---------------+-------+ - */ - - for rows.Next() { - if err = rows.Scan(&variable, &value); err != nil { - return "", errors.Trace(err) - } - } - - if err := rows.Err(); err != nil { - return "", errors.Trace(err) - } - - return value, nil -} - -// IsFunctionNotExistErr checks if err is a function not exist error. -func IsFunctionNotExistErr(err error, functionName string) bool { - return err != nil && - (strings.Contains(err.Error(), "No database selected") || - strings.Contains(err.Error(), fmt.Sprintf("%s does not exist", functionName))) -} - -// IsRaftKV2 checks whether the raft-kv2 is enabled -func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error) { - var ( - getRaftKvVersionSQL = "show config where type = 'tikv' and name = 'storage.engine'" - raftKv2 = "raft-kv2" - tp, instance, name, value string - ) - - rows, err := db.QueryContext(ctx, getRaftKvVersionSQL) - if err != nil { - return false, errors.Trace(err) - } - defer rows.Close() - - for rows.Next() { - if err = rows.Scan(&tp, &instance, &name, &value); err != nil { - return false, errors.Trace(err) - } - if value == raftKv2 { - return true, nil - } - } - return false, rows.Err() -} ->>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):pkg/lightning/common/util.go diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 7041493ab5249..a00ea10efdcfc 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -693,12 +693,7 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) { writeJSONError(w, http.StatusBadRequest, "cannot read request", err) return } -<<<<<<< HEAD:br/pkg/lightning/lightning.go - filteredData := utils.HideSensitive(string(data)) - log.L().Info("received task config", zap.String("content", filteredData)) -======= log.L().Info("received task config") ->>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):lightning/pkg/server/lightning.go cfg := config.NewConfig() if err = cfg.LoadFromGlobal(l.globalCfg); err != nil { diff --git a/pkg/lightning/common/util_test.go b/pkg/lightning/common/util_test.go deleted file mode 100644 index d8852bed1c358..0000000000000 --- a/pkg/lightning/common/util_test.go +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common_test - -import ( - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/DATA-DOG/go-sqlmock" - "github.com/go-sql-driver/mysql" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/lightning/common" - "github.com/pingcap/tidb/pkg/lightning/log" - "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDirNotExist(t *testing.T) { - require.True(t, common.IsDirExists(".")) - require.False(t, common.IsDirExists("not-exists")) -} - -func TestGetJSON(t *testing.T) { - type TestPayload struct { - Username string `json:"username"` - Password string `json:"password"` - } - request := TestPayload{ - Username: "lightning", - Password: "lightning-ctl", - } - - ctx := context.Background() - // Mock success response - handle := func(res http.ResponseWriter, _ *http.Request) { - res.WriteHeader(http.StatusOK) - err := json.NewEncoder(res).Encode(request) - require.NoError(t, err) - } - testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - handle(res, req) - })) - defer testServer.Close() - - client := &http.Client{Timeout: time.Second} - - response := TestPayload{} - err := common.GetJSON(ctx, client, "http://localhost:1", &response) - require.Error(t, err) - err = common.GetJSON(ctx, client, testServer.URL, &response) - require.NoError(t, err) - require.Equal(t, request, response) - - // Mock `StatusNoContent` response - handle = func(res http.ResponseWriter, _ *http.Request) { - res.WriteHeader(http.StatusNoContent) - } - err = common.GetJSON(ctx, client, testServer.URL, &response) - require.Error(t, err) - require.Regexp(t, ".*http status code != 200.*", err.Error()) -} - -func TestConnect(t *testing.T) { - plainPsw := "dQAUoDiyb1ucWZk7" - - require.NoError(t, failpoint.Enable( - "github.com/pingcap/tidb/pkg/lightning/common/MustMySQLPassword", - fmt.Sprintf("return(\"%s\")", plainPsw))) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/lightning/common/MustMySQLPassword")) - }() - - param := common.MySQLConnectParam{ - Host: "127.0.0.1", - Port: 4000, - User: "root", - Password: plainPsw, - SQLMode: "strict", - MaxAllowedPacket: 1234, - } - _, err := param.Connect() - require.NoError(t, err) - param.Password = base64.StdEncoding.EncodeToString([]byte(plainPsw)) - _, err = param.Connect() - require.NoError(t, err) -} - -func TestIsContextCanceledError(t *testing.T) { - require.True(t, common.IsContextCanceledError(context.Canceled)) - require.False(t, common.IsContextCanceledError(io.EOF)) -} - -func TestUniqueTable(t *testing.T) { - tableName := common.UniqueTable("test", "t1") - require.Equal(t, "`test`.`t1`", tableName) - - tableName = common.UniqueTable("test", "t`1") - require.Equal(t, "`test`.`t``1`", tableName) -} - -func TestSQLWithRetry(t *testing.T) { - db, mock, err := sqlmock.New() - require.NoError(t, err) - defer db.Close() - - sqlWithRetry := &common.SQLWithRetry{ - DB: db, - Logger: log.L(), - } - aValue := new(int) - - // retry defaultMaxRetry times and still failed - for i := 0; i < 3; i++ { - mock.ExpectQuery("select a from test.t1").WillReturnError(errors.Annotate(mysql.ErrInvalidConn, "mock error")) - } - err = sqlWithRetry.QueryRow(context.Background(), "", "select a from test.t1", aValue) - require.Regexp(t, ".*mock error", err.Error()) - - // meet unretryable error and will return directly - mock.ExpectQuery("select a from test.t1").WillReturnError(context.Canceled) - err = sqlWithRetry.QueryRow(context.Background(), "", "select a from test.t1", aValue) - require.Regexp(t, ".*context canceled", err.Error()) - - // query success - rows := sqlmock.NewRows([]string{"a"}).AddRow("1") - mock.ExpectQuery("select a from test.t1").WillReturnRows(rows) - - err = sqlWithRetry.QueryRow(context.Background(), "", "select a from test.t1", aValue) - require.NoError(t, err) - require.Equal(t, 1, *aValue) - - // test Exec - mock.ExpectExec("delete from").WillReturnError(context.Canceled) - err = sqlWithRetry.Exec(context.Background(), "", "delete from test.t1 where id = ?", 2) - require.Regexp(t, ".*context canceled", err.Error()) - - mock.ExpectExec("delete from").WillReturnResult(sqlmock.NewResult(0, 1)) - err = sqlWithRetry.Exec(context.Background(), "", "delete from test.t1 where id = ?", 2) - require.NoError(t, err) - - require.Nil(t, mock.ExpectationsWereMet()) -} - -func TestInterpolateMySQLString(t *testing.T) { - assert.Equal(t, "'123'", common.InterpolateMySQLString("123")) - assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23")) - assert.Equal(t, "'1''2''''3'", common.InterpolateMySQLString("1'2''3")) -} - -func TestGetAutoRandomColumn(t *testing.T) { - tests := []struct { - ddl string - colName string - }{ - {"create table t(c int)", ""}, - {"create table t(c int auto_increment)", ""}, - {"create table t(c bigint auto_random primary key)", "c"}, - {"create table t(a int, c bigint auto_random primary key)", "c"}, - {"create table t(c bigint auto_random, a int, primary key(c,a))", "c"}, - {"create table t(a int, c bigint auto_random, primary key(c,a))", "c"}, - } - p := parser.New() - for _, tt := range tests { - tableInfo, err := dbutiltest.GetTableInfoBySQL(tt.ddl, p) - require.NoError(t, err) - col := common.GetAutoRandomColumn(tableInfo) - if tt.colName == "" { - require.Nil(t, col, tt.ddl) - } else { - require.Equal(t, tt.colName, col.Name.L, tt.ddl) - } - } -} - -func TestBuildAddIndexSQL(t *testing.T) { - tests := []struct { - table string - current string - desired string - singleSQL string - multiSQLs []string - }{ - { - table: "`test`.`non_pk_auto_inc`", - current: `CREATE TABLE non_pk_auto_inc ( - pk varchar(255), - id int(11) NOT NULL AUTO_INCREMENT, - UNIQUE KEY uniq_id (id) - )`, - desired: `CREATE TABLE non_pk_auto_inc ( - pk varchar(255) PRIMARY KEY NONCLUSTERED, - id int(11) NOT NULL AUTO_INCREMENT, - UNIQUE KEY uniq_id (id) - )`, - singleSQL: "ALTER TABLE `test`.`non_pk_auto_inc` ADD PRIMARY KEY (`pk`)", - multiSQLs: []string{"ALTER TABLE `test`.`non_pk_auto_inc` ADD PRIMARY KEY (`pk`)"}, - }, - { - table: "`test`.`multi_indexes`", - current: ` -CREATE TABLE multi_indexes ( - c1 bigint PRIMARY KEY CLUSTERED, - c2 varchar(255) NOT NULL, - c3 varchar(255) NOT NULL, - c4 varchar(255) NOT NULL, - c5 varchar(255) NOT NULL, - c6 varchar(255) NOT NULL, - c7 varchar(255) NOT NULL, - c8 varchar(255) NOT NULL, - c9 varchar(255) NOT NULL, - c10 varchar(255) NOT NULL, - c11 varchar(255) NOT NULL -) -`, - desired: ` -CREATE TABLE multi_indexes ( - c1 bigint PRIMARY KEY CLUSTERED, - c2 varchar(255) NOT NULL UNIQUE KEY, - c3 varchar(255) NOT NULL, - c4 varchar(255) NOT NULL, - c5 varchar(255) NOT NULL, - c6 varchar(255) NOT NULL, - c7 varchar(255) NOT NULL, - c8 varchar(255) NOT NULL, - c9 varchar(255) NOT NULL, - c10 varchar(255) NOT NULL, - c11 varchar(255) NOT NULL, - INDEX idx_c2 (c2) COMMENT 'single column index', - INDEX idx_c2_c3(c2, c3) COMMENT 'multiple column index', - UNIQUE KEY uniq_c4 (c4) COMMENT 'single column unique key', - UNIQUE KEY uniq_c4_c5 (c4, c5) COMMENT 'multiple column unique key', - INDEX idx_c6 (c6 ASC) COMMENT 'single column index with asc order', - INDEX idx_c7 (c7 DESC) COMMENT 'single column index with desc order', - INDEX idx_c6_c7 (c6 ASC, c7 DESC) COMMENT 'multiple column index with asc and desc order', - INDEX idx_c8 (c8) VISIBLE COMMENT 'single column index with visible', - INDEX idx_c9 (c9) INVISIBLE COMMENT 'single column index with invisible', - INDEX idx_lower_c10 ((lower(c10))) COMMENT 'single column index with function', - INDEX idx_prefix_c11 (c11(3)) COMMENT 'single column index with prefix' -);`, - singleSQL: "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c2`(`c2`) COMMENT 'single column index'" + - ", ADD KEY `idx_c2_c3`(`c2`,`c3`) COMMENT 'multiple column index'" + - ", ADD UNIQUE KEY `uniq_c4`(`c4`) COMMENT 'single column unique key'" + - ", ADD UNIQUE KEY `uniq_c4_c5`(`c4`,`c5`) COMMENT 'multiple column unique key'" + - ", ADD KEY `idx_c6`(`c6`) COMMENT 'single column index with asc order'" + - ", ADD KEY `idx_c7`(`c7`) COMMENT 'single column index with desc order'" + - ", ADD KEY `idx_c6_c7`(`c6`,`c7`) COMMENT 'multiple column index with asc and desc order'" + - ", ADD KEY `idx_c8`(`c8`) COMMENT 'single column index with visible'" + - ", ADD KEY `idx_c9`(`c9`) INVISIBLE COMMENT 'single column index with invisible'" + - ", ADD KEY `idx_lower_c10`((lower(`c10`))) COMMENT 'single column index with function'" + - ", ADD KEY `idx_prefix_c11`(`c11`(3)) COMMENT 'single column index with prefix'" + - ", ADD UNIQUE KEY `c2`(`c2`)", - multiSQLs: []string{ - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c2`(`c2`) COMMENT 'single column index'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c2_c3`(`c2`,`c3`) COMMENT 'multiple column index'", - "ALTER TABLE `test`.`multi_indexes` ADD UNIQUE KEY `uniq_c4`(`c4`) COMMENT 'single column unique key'", - "ALTER TABLE `test`.`multi_indexes` ADD UNIQUE KEY `uniq_c4_c5`(`c4`,`c5`) COMMENT 'multiple column unique key'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c6`(`c6`) COMMENT 'single column index with asc order'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c7`(`c7`) COMMENT 'single column index with desc order'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c6_c7`(`c6`,`c7`)" + - " COMMENT 'multiple column index with asc and desc order'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c8`(`c8`) COMMENT 'single column index with visible'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c9`(`c9`) INVISIBLE COMMENT 'single column index with invisible'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_lower_c10`((lower(`c10`)))" + - " COMMENT 'single column index with function'", - "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_prefix_c11`(`c11`(3)) COMMENT 'single column index with prefix'", - "ALTER TABLE `test`.`multi_indexes` ADD UNIQUE KEY `c2`(`c2`)", - }, - }} - - p := parser.New() - - for _, tt := range tests { - curTblInfo, err := dbutiltest.GetTableInfoBySQL(tt.current, p) - require.NoError(t, err) - desiredTblInfo, err := dbutiltest.GetTableInfoBySQL(tt.desired, p) - require.NoError(t, err) - - singleSQL, multiSQLs := common.BuildAddIndexSQL(tt.table, curTblInfo, desiredTblInfo) - require.Equal(t, tt.singleSQL, singleSQL) - require.Equal(t, tt.multiSQLs, multiSQLs) - } -}