Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52336
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
kennytm authored and ti-chi-bot committed Apr 11, 2024
1 parent fd9583b commit e937912
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 0 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ go_test(
],
embed = [":common"],
flaky = True,
<<<<<<< HEAD:br/pkg/lightning/common/BUILD.bazel
shard_count = 21,
=======
shard_count = 29,
>>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):pkg/lightning/common/BUILD.bazel
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
35 changes: 35 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,38 @@ func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value

return value, nil
}
<<<<<<< HEAD:br/pkg/lightning/common/util.go
=======

// IsFunctionNotExistErr checks if err is a function not exist error.
func IsFunctionNotExistErr(err error, functionName string) bool {
return err != nil &&
(strings.Contains(err.Error(), "No database selected") ||
strings.Contains(err.Error(), fmt.Sprintf("%s does not exist", functionName)))
}

// IsRaftKV2 checks whether the raft-kv2 is enabled
func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error) {
var (
getRaftKvVersionSQL = "show config where type = 'tikv' and name = 'storage.engine'"
raftKv2 = "raft-kv2"
tp, instance, name, value string
)

rows, err := db.QueryContext(ctx, getRaftKvVersionSQL)
if err != nil {
return false, errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
if err = rows.Scan(&tp, &instance, &name, &value); err != nil {
return false, errors.Trace(err)
}
if value == raftKv2 {
return true, nil
}
}
return false, rows.Err()
}
>>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):pkg/lightning/common/util.go
4 changes: 4 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,12 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) {
writeJSONError(w, http.StatusBadRequest, "cannot read request", err)
return
}
<<<<<<< HEAD:br/pkg/lightning/lightning.go
filteredData := utils.HideSensitive(string(data))
log.L().Info("received task config", zap.String("content", filteredData))
=======
log.L().Info("received task config")
>>>>>>> 555ce023522 (lightning: Don't log "received task config" in server mode (#52336)):lightning/pkg/server/lightning.go

cfg := config.NewConfig()
if err = cfg.LoadFromGlobal(l.globalCfg); err != nil {
Expand Down
304 changes: 304 additions & 0 deletions pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common_test

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDirNotExist(t *testing.T) {
require.True(t, common.IsDirExists("."))
require.False(t, common.IsDirExists("not-exists"))
}

func TestGetJSON(t *testing.T) {
type TestPayload struct {
Username string `json:"username"`
Password string `json:"password"`
}
request := TestPayload{
Username: "lightning",
Password: "lightning-ctl",
}

ctx := context.Background()
// Mock success response
handle := func(res http.ResponseWriter, _ *http.Request) {
res.WriteHeader(http.StatusOK)
err := json.NewEncoder(res).Encode(request)
require.NoError(t, err)
}
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
handle(res, req)
}))
defer testServer.Close()

client := &http.Client{Timeout: time.Second}

response := TestPayload{}
err := common.GetJSON(ctx, client, "http://localhost:1", &response)
require.Error(t, err)
err = common.GetJSON(ctx, client, testServer.URL, &response)
require.NoError(t, err)
require.Equal(t, request, response)

// Mock `StatusNoContent` response
handle = func(res http.ResponseWriter, _ *http.Request) {
res.WriteHeader(http.StatusNoContent)
}
err = common.GetJSON(ctx, client, testServer.URL, &response)
require.Error(t, err)
require.Regexp(t, ".*http status code != 200.*", err.Error())
}

func TestConnect(t *testing.T) {
plainPsw := "dQAUoDiyb1ucWZk7"

require.NoError(t, failpoint.Enable(
"github.com/pingcap/tidb/pkg/lightning/common/MustMySQLPassword",
fmt.Sprintf("return(\"%s\")", plainPsw)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/lightning/common/MustMySQLPassword"))
}()

param := common.MySQLConnectParam{
Host: "127.0.0.1",
Port: 4000,
User: "root",
Password: plainPsw,
SQLMode: "strict",
MaxAllowedPacket: 1234,
}
_, err := param.Connect()
require.NoError(t, err)
param.Password = base64.StdEncoding.EncodeToString([]byte(plainPsw))
_, err = param.Connect()
require.NoError(t, err)
}

func TestIsContextCanceledError(t *testing.T) {
require.True(t, common.IsContextCanceledError(context.Canceled))
require.False(t, common.IsContextCanceledError(io.EOF))
}

func TestUniqueTable(t *testing.T) {
tableName := common.UniqueTable("test", "t1")
require.Equal(t, "`test`.`t1`", tableName)

tableName = common.UniqueTable("test", "t`1")
require.Equal(t, "`test`.`t``1`", tableName)
}

func TestSQLWithRetry(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()

sqlWithRetry := &common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
aValue := new(int)

// retry defaultMaxRetry times and still failed
for i := 0; i < 3; i++ {
mock.ExpectQuery("select a from test.t1").WillReturnError(errors.Annotate(mysql.ErrInvalidConn, "mock error"))
}
err = sqlWithRetry.QueryRow(context.Background(), "", "select a from test.t1", aValue)
require.Regexp(t, ".*mock error", err.Error())

// meet unretryable error and will return directly
mock.ExpectQuery("select a from test.t1").WillReturnError(context.Canceled)
err = sqlWithRetry.QueryRow(context.Background(), "", "select a from test.t1", aValue)
require.Regexp(t, ".*context canceled", err.Error())

// query success
rows := sqlmock.NewRows([]string{"a"}).AddRow("1")
mock.ExpectQuery("select a from test.t1").WillReturnRows(rows)

err = sqlWithRetry.QueryRow(context.Background(), "", "select a from test.t1", aValue)
require.NoError(t, err)
require.Equal(t, 1, *aValue)

// test Exec
mock.ExpectExec("delete from").WillReturnError(context.Canceled)
err = sqlWithRetry.Exec(context.Background(), "", "delete from test.t1 where id = ?", 2)
require.Regexp(t, ".*context canceled", err.Error())

mock.ExpectExec("delete from").WillReturnResult(sqlmock.NewResult(0, 1))
err = sqlWithRetry.Exec(context.Background(), "", "delete from test.t1 where id = ?", 2)
require.NoError(t, err)

require.Nil(t, mock.ExpectationsWereMet())
}

func TestInterpolateMySQLString(t *testing.T) {
assert.Equal(t, "'123'", common.InterpolateMySQLString("123"))
assert.Equal(t, "'1''23'", common.InterpolateMySQLString("1'23"))
assert.Equal(t, "'1''2''''3'", common.InterpolateMySQLString("1'2''3"))
}

func TestGetAutoRandomColumn(t *testing.T) {
tests := []struct {
ddl string
colName string
}{
{"create table t(c int)", ""},
{"create table t(c int auto_increment)", ""},
{"create table t(c bigint auto_random primary key)", "c"},
{"create table t(a int, c bigint auto_random primary key)", "c"},
{"create table t(c bigint auto_random, a int, primary key(c,a))", "c"},
{"create table t(a int, c bigint auto_random, primary key(c,a))", "c"},
}
p := parser.New()
for _, tt := range tests {
tableInfo, err := dbutiltest.GetTableInfoBySQL(tt.ddl, p)
require.NoError(t, err)
col := common.GetAutoRandomColumn(tableInfo)
if tt.colName == "" {
require.Nil(t, col, tt.ddl)
} else {
require.Equal(t, tt.colName, col.Name.L, tt.ddl)
}
}
}

func TestBuildAddIndexSQL(t *testing.T) {
tests := []struct {
table string
current string
desired string
singleSQL string
multiSQLs []string
}{
{
table: "`test`.`non_pk_auto_inc`",
current: `CREATE TABLE non_pk_auto_inc (
pk varchar(255),
id int(11) NOT NULL AUTO_INCREMENT,
UNIQUE KEY uniq_id (id)
)`,
desired: `CREATE TABLE non_pk_auto_inc (
pk varchar(255) PRIMARY KEY NONCLUSTERED,
id int(11) NOT NULL AUTO_INCREMENT,
UNIQUE KEY uniq_id (id)
)`,
singleSQL: "ALTER TABLE `test`.`non_pk_auto_inc` ADD PRIMARY KEY (`pk`)",
multiSQLs: []string{"ALTER TABLE `test`.`non_pk_auto_inc` ADD PRIMARY KEY (`pk`)"},
},
{
table: "`test`.`multi_indexes`",
current: `
CREATE TABLE multi_indexes (
c1 bigint PRIMARY KEY CLUSTERED,
c2 varchar(255) NOT NULL,
c3 varchar(255) NOT NULL,
c4 varchar(255) NOT NULL,
c5 varchar(255) NOT NULL,
c6 varchar(255) NOT NULL,
c7 varchar(255) NOT NULL,
c8 varchar(255) NOT NULL,
c9 varchar(255) NOT NULL,
c10 varchar(255) NOT NULL,
c11 varchar(255) NOT NULL
)
`,
desired: `
CREATE TABLE multi_indexes (
c1 bigint PRIMARY KEY CLUSTERED,
c2 varchar(255) NOT NULL UNIQUE KEY,
c3 varchar(255) NOT NULL,
c4 varchar(255) NOT NULL,
c5 varchar(255) NOT NULL,
c6 varchar(255) NOT NULL,
c7 varchar(255) NOT NULL,
c8 varchar(255) NOT NULL,
c9 varchar(255) NOT NULL,
c10 varchar(255) NOT NULL,
c11 varchar(255) NOT NULL,
INDEX idx_c2 (c2) COMMENT 'single column index',
INDEX idx_c2_c3(c2, c3) COMMENT 'multiple column index',
UNIQUE KEY uniq_c4 (c4) COMMENT 'single column unique key',
UNIQUE KEY uniq_c4_c5 (c4, c5) COMMENT 'multiple column unique key',
INDEX idx_c6 (c6 ASC) COMMENT 'single column index with asc order',
INDEX idx_c7 (c7 DESC) COMMENT 'single column index with desc order',
INDEX idx_c6_c7 (c6 ASC, c7 DESC) COMMENT 'multiple column index with asc and desc order',
INDEX idx_c8 (c8) VISIBLE COMMENT 'single column index with visible',
INDEX idx_c9 (c9) INVISIBLE COMMENT 'single column index with invisible',
INDEX idx_lower_c10 ((lower(c10))) COMMENT 'single column index with function',
INDEX idx_prefix_c11 (c11(3)) COMMENT 'single column index with prefix'
);`,
singleSQL: "ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c2`(`c2`) COMMENT 'single column index'" +
", ADD KEY `idx_c2_c3`(`c2`,`c3`) COMMENT 'multiple column index'" +
", ADD UNIQUE KEY `uniq_c4`(`c4`) COMMENT 'single column unique key'" +
", ADD UNIQUE KEY `uniq_c4_c5`(`c4`,`c5`) COMMENT 'multiple column unique key'" +
", ADD KEY `idx_c6`(`c6`) COMMENT 'single column index with asc order'" +
", ADD KEY `idx_c7`(`c7`) COMMENT 'single column index with desc order'" +
", ADD KEY `idx_c6_c7`(`c6`,`c7`) COMMENT 'multiple column index with asc and desc order'" +
", ADD KEY `idx_c8`(`c8`) COMMENT 'single column index with visible'" +
", ADD KEY `idx_c9`(`c9`) INVISIBLE COMMENT 'single column index with invisible'" +
", ADD KEY `idx_lower_c10`((lower(`c10`))) COMMENT 'single column index with function'" +
", ADD KEY `idx_prefix_c11`(`c11`(3)) COMMENT 'single column index with prefix'" +
", ADD UNIQUE KEY `c2`(`c2`)",
multiSQLs: []string{
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c2`(`c2`) COMMENT 'single column index'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c2_c3`(`c2`,`c3`) COMMENT 'multiple column index'",
"ALTER TABLE `test`.`multi_indexes` ADD UNIQUE KEY `uniq_c4`(`c4`) COMMENT 'single column unique key'",
"ALTER TABLE `test`.`multi_indexes` ADD UNIQUE KEY `uniq_c4_c5`(`c4`,`c5`) COMMENT 'multiple column unique key'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c6`(`c6`) COMMENT 'single column index with asc order'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c7`(`c7`) COMMENT 'single column index with desc order'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c6_c7`(`c6`,`c7`)" +
" COMMENT 'multiple column index with asc and desc order'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c8`(`c8`) COMMENT 'single column index with visible'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_c9`(`c9`) INVISIBLE COMMENT 'single column index with invisible'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_lower_c10`((lower(`c10`)))" +
" COMMENT 'single column index with function'",
"ALTER TABLE `test`.`multi_indexes` ADD KEY `idx_prefix_c11`(`c11`(3)) COMMENT 'single column index with prefix'",
"ALTER TABLE `test`.`multi_indexes` ADD UNIQUE KEY `c2`(`c2`)",
},
}}

p := parser.New()

for _, tt := range tests {
curTblInfo, err := dbutiltest.GetTableInfoBySQL(tt.current, p)
require.NoError(t, err)
desiredTblInfo, err := dbutiltest.GetTableInfoBySQL(tt.desired, p)
require.NoError(t, err)

singleSQL, multiSQLs := common.BuildAddIndexSQL(tt.table, curTblInfo, desiredTblInfo)
require.Equal(t, tt.singleSQL, singleSQL)
require.Equal(t, tt.multiSQLs, multiSQLs)
}
}

0 comments on commit e937912

Please sign in to comment.