Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52336
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
kennytm authored and ti-chi-bot committed Jun 4, 2024
1 parent 42669ad commit 7a25fab
Show file tree
Hide file tree
Showing 4 changed files with 507 additions and 0 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ go_test(
],
embed = [":common"],
flaky = True,
<<<<<<< HEAD:br/pkg/lightning/common/BUILD.bazel
shard_count = 21,
=======
shard_count = 29,
>>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):pkg/lightning/common/BUILD.bazel
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
195 changes: 195 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,198 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo {
}
return nil
}
<<<<<<< HEAD:br/pkg/lightning/common/util.go
=======

// GetDropIndexInfos returns the index infos that need to be dropped and the remain indexes.
func GetDropIndexInfos(
tblInfo *model.TableInfo,
) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo) {
cols := tblInfo.Columns
loop:
for _, idxInfo := range tblInfo.Indices {
if idxInfo.State != model.StatePublic {
remainIndexes = append(remainIndexes, idxInfo)
continue
}
// Primary key is a cluster index.
if idxInfo.Primary && tblInfo.HasClusteredIndex() {
remainIndexes = append(remainIndexes, idxInfo)
continue
}
// Skip index that contains auto-increment column.
// Because auto colum must be defined as a key.
for _, idxCol := range idxInfo.Columns {
flag := cols[idxCol.Offset].GetFlag()
if tmysql.HasAutoIncrementFlag(flag) {
remainIndexes = append(remainIndexes, idxInfo)
continue loop
}
}
dropIndexes = append(dropIndexes, idxInfo)
}
return remainIndexes, dropIndexes
}

// BuildDropIndexSQL builds the SQL statement to drop index.
func BuildDropIndexSQL(dbName, tableName string, idxInfo *model.IndexInfo) string {
if idxInfo.Primary {
return SprintfWithIdentifiers("ALTER TABLE %s.%s DROP PRIMARY KEY", dbName, tableName)
}
return SprintfWithIdentifiers("ALTER TABLE %s.%s DROP INDEX %s", dbName, tableName, idxInfo.Name.O)
}

// BuildAddIndexSQL builds the SQL statement to create missing indexes.
// It returns both a single SQL statement that creates all indexes at once,
// and a list of SQL statements that creates each index individually.
func BuildAddIndexSQL(
tableName string,
curTblInfo,
desiredTblInfo *model.TableInfo,
) (singleSQL string, multiSQLs []string) {
addIndexSpecs := make([]string, 0, len(desiredTblInfo.Indices))
loop:
for _, desiredIdxInfo := range desiredTblInfo.Indices {
for _, curIdxInfo := range curTblInfo.Indices {
if curIdxInfo.Name.L == desiredIdxInfo.Name.L {
continue loop
}
}

var buf bytes.Buffer
if desiredIdxInfo.Primary {
buf.WriteString("ADD PRIMARY KEY ")
} else if desiredIdxInfo.Unique {
buf.WriteString("ADD UNIQUE KEY ")
} else {
buf.WriteString("ADD KEY ")
}
// "primary" is a special name for primary key, we should not use it as index name.
if desiredIdxInfo.Name.L != "primary" {
buf.WriteString(EscapeIdentifier(desiredIdxInfo.Name.O))
}

colStrs := make([]string, 0, len(desiredIdxInfo.Columns))
for _, col := range desiredIdxInfo.Columns {
var colStr string
if desiredTblInfo.Columns[col.Offset].Hidden {
colStr = fmt.Sprintf("(%s)", desiredTblInfo.Columns[col.Offset].GeneratedExprString)
} else {
colStr = EscapeIdentifier(col.Name.O)
if col.Length != types.UnspecifiedLength {
colStr = fmt.Sprintf("%s(%s)", colStr, strconv.Itoa(col.Length))
}
}
colStrs = append(colStrs, colStr)
}
fmt.Fprintf(&buf, "(%s)", strings.Join(colStrs, ","))

if desiredIdxInfo.Invisible {
fmt.Fprint(&buf, " INVISIBLE")
}
if desiredIdxInfo.Comment != "" {
fmt.Fprintf(&buf, ` COMMENT '%s'`, format.OutputFormat(desiredIdxInfo.Comment))
}
addIndexSpecs = append(addIndexSpecs, buf.String())
}
if len(addIndexSpecs) == 0 {
return "", nil
}

singleSQL = fmt.Sprintf("ALTER TABLE %s %s", tableName, strings.Join(addIndexSpecs, ", "))
for _, spec := range addIndexSpecs {
multiSQLs = append(multiSQLs, fmt.Sprintf("ALTER TABLE %s %s", tableName, spec))
}
return singleSQL, multiSQLs
}

// IsDupKeyError checks if err is a duplicate index error.
func IsDupKeyError(err error) bool {
if merr, ok := errors.Cause(err).(*mysql.MySQLError); ok {
switch merr.Number {
case errno.ErrDupKeyName, errno.ErrMultiplePriKey, errno.ErrDupUnique:
return true
}
}
return false
}

// GetBackoffWeightFromDB gets the backoff weight from database.
func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) {
val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight)
if err != nil {
return 0, err
}
return strconv.Atoi(val)
}

// GetExplicitRequestSourceTypeFromDB gets the explicit request source type from database.
func GetExplicitRequestSourceTypeFromDB(ctx context.Context, db *sql.DB) (string, error) {
return getSessionVariable(ctx, db, variable.TiDBExplicitRequestSourceType)
}

// copy from dbutil to avoid import cycle
func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable)
rows, err := db.QueryContext(ctx, query)

if err != nil {
return "", errors.Trace(err)
}
defer rows.Close()

// Show an example.
/*
mysql> SHOW VARIABLES LIKE "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
*/

for rows.Next() {
if err = rows.Scan(&variable, &value); err != nil {
return "", errors.Trace(err)
}
}

if err := rows.Err(); err != nil {
return "", errors.Trace(err)
}

return value, nil
}

// IsFunctionNotExistErr checks if err is a function not exist error.
func IsFunctionNotExistErr(err error, functionName string) bool {
return err != nil &&
(strings.Contains(err.Error(), "No database selected") ||
strings.Contains(err.Error(), fmt.Sprintf("%s does not exist", functionName)))
}

// IsRaftKV2 checks whether the raft-kv2 is enabled
func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error) {
var (
getRaftKvVersionSQL = "show config where type = 'tikv' and name = 'storage.engine'"
raftKv2 = "raft-kv2"
tp, instance, name, value string
)

rows, err := db.QueryContext(ctx, getRaftKvVersionSQL)
if err != nil {
return false, errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
if err = rows.Scan(&tp, &instance, &name, &value); err != nil {
return false, errors.Trace(err)
}
if value == raftKv2 {
return true, nil
}
}
return false, rows.Err()
}
>>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):pkg/lightning/common/util.go
4 changes: 4 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,12 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) {
writeJSONError(w, http.StatusBadRequest, "cannot read request", err)
return
}
<<<<<<< HEAD:br/pkg/lightning/lightning.go
filteredData := utils.HideSensitive(string(data))
log.L().Info("received task config", zap.String("content", filteredData))
=======
log.L().Info("received task config")
>>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):lightning/pkg/server/lightning.go

cfg := config.NewConfig()
if err = cfg.LoadFromGlobal(l.globalCfg); err != nil {
Expand Down
Loading

0 comments on commit 7a25fab

Please sign in to comment.