Skip to content

Commit

Permalink
Merge branch 'master' into micali
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 10, 2022
2 parents 6d5e10d + 3884b28 commit 7e4b2d2
Show file tree
Hide file tree
Showing 33 changed files with 12,209 additions and 10,194 deletions.
5 changes: 3 additions & 2 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
unique := false
length := len(rawValue)
keyVer := rawValue[length-1]
if keyVer == tables.TempIndexKeyTypeMerge {
// The kv is written in the merging state. It has been written to the origin index, we can skip it.
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
rawValue = rawValue[:length-1]
Expand Down
45 changes: 45 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,48 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) {
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10"))
}

func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec(`CREATE TABLE t (a DATE NULL DEFAULT '1619-01-18', b BOOL NULL DEFAULT '0') CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_bin';`)
tk.MustExec(`INSERT INTO t SET b = '1';`)

updateSQLs := []string{
"UPDATE t SET a = '9432-05-10', b = '0';",
"UPDATE t SET a = '9432-05-10', b = '1';",
}

// Force onCreateIndex use the txn-merge process.
ingest.LitInitialized = false
tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;")
tk.MustExec("set @@global.tidb_enable_mutation_checker = 1;")
tk.MustExec("set @@global.tidb_txn_assertion_level = 'STRICT';")

var checkErrs []error
originHook := dom.DDL().GetHook()
callback := &ddl.TestDDLCallback{
Do: dom,
}
onJobUpdatedBefore := func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
for _, sql := range updateSQLs {
_, err := tk2.Exec(sql)
if err != nil {
checkErrs = append(checkErrs, err)
}
}
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedBefore)
dom.DDL().SetHook(callback)
tk.MustExec("alter table t add index idx(b);")
dom.DDL().SetHook(originHook)
for _, err := range checkErrs {
require.NoError(t, err)
}
tk.MustExec("admin check table t;")
}
1 change: 1 addition & 0 deletions dumpling/export/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"@com_github_coreos_go_semver//semver",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
Expand Down
7 changes: 7 additions & 0 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -144,6 +145,9 @@ type Config struct {
PromFactory promutil.Factory `json:"-"`
PromRegistry promutil.Registry `json:"-"`
ExtStorage storage.ExternalStorage `json:"-"`

IOTotalBytes *atomic.Uint64
Net string
}

// ServerInfoUnknown is the unknown database type to dumpling
Expand Down Expand Up @@ -212,6 +216,9 @@ func (conf *Config) GetDriverConfig(db string) *mysql.Config {
driverCfg.User = conf.User
driverCfg.Passwd = conf.Password
driverCfg.Net = "tcp"
if conf.Net != "" {
driverCfg.Net = conf.Net
}
driverCfg.Addr = hostPort
driverCfg.DBName = db
driverCfg.Collation = "utf8mb4_general_ci"
Expand Down
31 changes: 31 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"encoding/hex"
"fmt"
"math/big"
"net"
"strconv"
"strings"
"sync/atomic"
"time"

// import mysql driver
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pclog "github.com/pingcap/log"
Expand All @@ -31,8 +33,10 @@ import (
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
pd "github.com/tikv/pd/client"
gatomic "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -101,6 +105,17 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
if err != nil {
return nil, err
}
failpoint.Inject("SetIOTotalBytes", func(_ failpoint.Value) {
d.conf.IOTotalBytes = gatomic.NewUint64(0)
d.conf.Net = uuid.New().String()
go func() {
for {
time.Sleep(10 * time.Millisecond)
d.tctx.L().Logger.Info("IOTotalBytes", zap.Uint64("IOTotalBytes", d.conf.IOTotalBytes.Load()))
}
}()
})

err = runSteps(d,
initLogger,
createExternalStore,
Expand Down Expand Up @@ -1330,6 +1345,22 @@ func startHTTPService(d *Dumper) error {

// openSQLDB is an initialization step of Dumper.
func openSQLDB(d *Dumper) error {
if d.conf.IOTotalBytes != nil {
mysql.RegisterDialContext(d.conf.Net, func(ctx context.Context, addr string) (net.Conn, error) {
dial := &net.Dialer{}
conn, err := dial.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
tcpConn := conn.(*net.TCPConn)
// try https://github.com/go-sql-driver/mysql/blob/bcc459a906419e2890a50fc2c99ea6dd927a88f2/connector.go#L56-L64
err = tcpConn.SetKeepAlive(true)
if err != nil {
d.tctx.L().Logger.Warn("fail to keep alive", zap.Error(err))
}
return util.NewTCPConnWithIOCounter(tcpConn, d.conf.IOTotalBytes), nil
})
}
conf := d.conf
c, err := mysql.NewConnector(conf.GetDriverConfig(""))
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions dumpling/tests/basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,14 @@ run_dumpling --consistency lock -B "$DB_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling
cnt=$(grep -w "$DB_NAME" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}-schema-create.sql|wc -l)
echo "records count is ${cnt}"
[ "$cnt" = 1 ]

# Test for recording network usage
run_sql "drop database if exists test_db;"
run_sql "create database test_db;"
run_sql "create table test_db.test_table (a int primary key);"
run_sql "insert into test_db.test_table values (1),(2),(3),(4),(5),(6),(7),(8);"

export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/SetIOTotalBytes=return(1)"
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
cnt=$(grep "IOTotalBytes=" ${DUMPLING_OUTPUT_DIR}/dumpling.log | grep -v "IOTotalBytes=0" | wc -l)
[ "$cnt" -ge 1 ]
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ const (
ErrFunctionalIndexDataIsTooLong = 3907
ErrFunctionalIndexNotApplicable = 3909
ErrDynamicPrivilegeNotRegistered = 3929
ErUserAccessDeniedForUserAccountBlockedByPasswordLock = 3955
ErrTableWithoutPrimaryKey = 3750
// MariaDB errors.
ErrOnlyOneDefaultPartionAllowed = 4030
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrDependentByGeneratedColumn: mysql.Message("Column '%s' has a generated column dependency.", nil),
ErrGeneratedColumnRefAutoInc: mysql.Message("Generated column '%s' cannot refer to auto-increment column.", nil),
ErrAccountHasBeenLocked: mysql.Message("Access denied for user '%s'@'%s'. Account is locked.", nil),
ErUserAccessDeniedForUserAccountBlockedByPasswordLock: mysql.Message("Access denied for user '%s'@'%s'. Account is blocked for %s day(s) (%s day(s) remaining) due to %d consecutive failed logins.", nil),
ErrWarnConflictingHint: mysql.Message("Hint %s is ignored as conflicting/duplicated.", nil),
ErrUnresolvedHintName: mysql.Message("Unresolved name '%s' for %s hint", nil),
ErrForeignKeyCascadeDepthExceeded: mysql.Message("Foreign key cascade delete/update exceeds max depth of %v.", nil),
Expand Down
2 changes: 1 addition & 1 deletion executor/grant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestMaintainRequire(t *testing.T) {

// test show create user
tk.MustExec(`CREATE USER 'u3'@'%' require issuer '/CN=TiDB admin/OU=TiDB/O=PingCAP/L=San Francisco/ST=California/C=US' subject '/CN=tester1/OU=TiDB/O=PingCAP.Inc/L=Haidian/ST=Beijing/C=ZH' cipher 'AES128-GCM-SHA256'`)
tk.MustQuery("show create user 'u3'").Check(testkit.Rows("CREATE USER 'u3'@'%' IDENTIFIED WITH 'mysql_native_password' AS '' REQUIRE CIPHER 'AES128-GCM-SHA256' ISSUER '/CN=TiDB admin/OU=TiDB/O=PingCAP/L=San Francisco/ST=California/C=US' SUBJECT '/CN=tester1/OU=TiDB/O=PingCAP.Inc/L=Haidian/ST=Beijing/C=ZH' PASSWORD EXPIRE DEFAULT ACCOUNT UNLOCK PASSWORD HISTORY DEFALUT PASSWORD REUSE INTERVAL DEFALUT"))
tk.MustQuery("show create user 'u3'").Check(testkit.Rows("CREATE USER 'u3'@'%' IDENTIFIED WITH 'mysql_native_password' AS '' REQUIRE CIPHER 'AES128-GCM-SHA256' ISSUER '/CN=TiDB admin/OU=TiDB/O=PingCAP/L=San Francisco/ST=California/C=US' SUBJECT '/CN=tester1/OU=TiDB/O=PingCAP.Inc/L=Haidian/ST=Beijing/C=ZH' PASSWORD EXPIRE DEFAULT ACCOUNT UNLOCK PASSWORD HISTORY DEFAULT PASSWORD REUSE INTERVAL DEFAULT"))

// check issuer/subject/cipher value
err := tk.ExecToErr(`CREATE USER 'u4'@'%' require issuer 'CN=TiDB,OU=PingCAP'`)
Expand Down
28 changes: 22 additions & 6 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,10 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)

rows, _, err := exec.ExecRestrictedSQL(ctx, nil,
`SELECT plugin, Account_locked, JSON_UNQUOTE(JSON_EXTRACT(user_attributes, '$.metadata')), Token_issuer, Password_reuse_history, Password_reuse_time, Password_expired, Password_lifetime
`SELECT plugin, Account_locked, user_attributes->>'$.metadata', Token_issuer,
Password_reuse_history, Password_reuse_time, Password_expired, Password_lifetime,
user_attributes->>'$.Password_locking.failed_login_attempts',
user_attributes->>'$.Password_locking.password_lock_time_days'
FROM %n.%n WHERE User=%? AND Host=%?`,
mysql.SystemDB, mysql.UserTable, userName, strings.ToLower(hostName))
if err != nil {
Expand All @@ -1539,7 +1542,7 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {

userAttributes := rows[0].GetString(2)
if len(userAttributes) > 0 {
userAttributes = " ATTRIBUTE " + userAttributes
userAttributes = fmt.Sprintf(" ATTRIBUTE '%s'", userAttributes)
}

tokenIssuer := rows[0].GetString(3)
Expand All @@ -1549,14 +1552,14 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {

var passwordHistory string
if rows[0].IsNull(4) {
passwordHistory = "DEFALUT"
passwordHistory = "DEFAULT"
} else {
passwordHistory = strconv.FormatUint(rows[0].GetUint64(4), 10)
}

var passwordReuseInterval string
if rows[0].IsNull(5) {
passwordReuseInterval = "DEFALUT"
passwordReuseInterval = "DEFAULT"
} else {
passwordReuseInterval = strconv.FormatUint(rows[0].GetUint64(5), 10) + " DAY"
}
Expand All @@ -1575,6 +1578,19 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
passwordExpiredStr = fmt.Sprintf("PASSWORD EXPIRE INTERVAL %d DAY", passwordLifetime)
}

failedLoginAttempts := rows[0].GetString(8)
if len(failedLoginAttempts) > 0 {
failedLoginAttempts = " FAILED_LOGIN_ATTEMPTS " + failedLoginAttempts
}

passwordLockTimeDays := rows[0].GetString(9)
if len(passwordLockTimeDays) > 0 {
if passwordLockTimeDays == "-1" {
passwordLockTimeDays = " PASSWORD_LOCK_TIME UNBOUNDED"
} else {
passwordLockTimeDays = " PASSWORD_LOCK_TIME " + passwordLockTimeDays
}
}
rows, _, err = exec.ExecRestrictedSQL(ctx, nil, `SELECT Priv FROM %n.%n WHERE User=%? AND Host=%?`, mysql.SystemDB, mysql.GlobalPrivTable, userName, hostName)
if err != nil {
return errors.Trace(err)
Expand All @@ -1598,8 +1614,8 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
}

// FIXME: the returned string is not escaped safely
showStr := fmt.Sprintf("CREATE USER '%s'@'%s' IDENTIFIED WITH '%s'%s REQUIRE %s%s %s ACCOUNT %s%s PASSWORD HISTORY %s PASSWORD REUSE INTERVAL %s",
e.User.Username, e.User.Hostname, authplugin, authStr, require, tokenIssuer, passwordExpiredStr, accountLocked, userAttributes, passwordHistory, passwordReuseInterval)
showStr := fmt.Sprintf("CREATE USER '%s'@'%s' IDENTIFIED WITH '%s'%s REQUIRE %s%s %s ACCOUNT %s PASSWORD HISTORY %s PASSWORD REUSE INTERVAL %s%s%s%s",
e.User.Username, e.User.Hostname, authplugin, authStr, require, tokenIssuer, passwordExpiredStr, accountLocked, passwordHistory, passwordReuseInterval, failedLoginAttempts, passwordLockTimeDays, userAttributes)
e.appendRow([]interface{}{showStr})
return nil
}
Expand Down
Loading

0 comments on commit 7e4b2d2

Please sign in to comment.