Skip to content

Commit

Permalink
Merge branch 'master' into test-tiflash-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Aug 25, 2022
2 parents 40eb204 + 87a6106 commit 1ddeb4f
Show file tree
Hide file tree
Showing 226 changed files with 4,672 additions and 1,121 deletions.
21 changes: 12 additions & 9 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
startup --host_jvm_args=-Xmx8g
startup --unlimit_coredumps

run:ci --color=yes

build --announce_rc
build --java_language_version=17
build --java_runtime_version=17
build --tool_java_language_version=17
build --tool_java_runtime_version=17
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --color=yes
run:ci --color=yes

build:ci --experimental_remote_cache_compression
build:release --workspace_status_command=./build/print-workspace-status.sh --stamp
build:release --config=ci
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --experimental_remote_cache_compression
test:ci --color=yes
test:ci --verbose_failures
test:ci --test_env=GO_TEST_WRAP_TESTV=1 --test_verbose_timeout_warnings
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600

build:race --config=ci
build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=1 --test_sharding_strategy=disabled

test --test_env=TZ=Asia/Shanghai
test --test_output=errors --test_summary=testcase
test:ci --color=yes
test:ci --verbose_failures --test_verbose_timeout_warnings
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --experimental_ui_max_stdouterr_bytes=104857600
test:race --test_timeout=1200,6000,18000,72000

try-import /data/bazel
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3366,8 +3366,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:WFR3seA8YtBhDn47YJSW1P1/lwBIXsk0vALnRVuaL/M=",
version = "v2.0.1-0.20220815094724-025596b7a20a",
sum = "h1:/nr7P8uzJQ7u3wPEBHCokrsVmuDvi/1x/zI/ydk5n8U=",
version = "v2.0.1-0.20220818084834-0d0ae0dcfb1f",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,7 @@ bazel_statisticstest: failpoint-enable bazel_ci_prepare
bazel_txntest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/txntest/...

bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/addindextest/...
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
shard_count = 50,
deps = [
"//config",
"//domain",
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ func newSession(options *SessionOptions, logger log.Logger) *session {
vars.SQLMode = sqlMode
if options.SysVars != nil {
for k, v := range options.SysVars {
// since 6.3(current master) tidb checks whether we can set a system variable
// lc_time_names is a read-only variable for now, but might be implemented later,
// so we not remove it from defaultImportantVariables and check it in below way.
if sv := variable.GetSysVar(k); sv == nil {
logger.DPanic("unknown system var", zap.String("key", k))
continue
} else if sv.ReadOnly {
logger.Debug("skip read-only variable", zap.String("key", k))
continue
}
if err := vars.SetSystemVar(k, v); err != nil {
logger.DPanic("new session: failed to set system var",
log.ShortError(err),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//parser/mysql",
"//table",
"//tablecodec",
"//types",
"//util/codec",
"//util/engine",
"//util/hack",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -1693,6 +1694,10 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
if err == nil {
return nil
}
if types.ErrBadNumber.Equal(err) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName)
}
if log.IsContextCanceledError(err) {
return err
}
Expand Down
27 changes: 14 additions & 13 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,20 @@ var (
ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
)

type withStack struct {
Expand Down
68 changes: 50 additions & 18 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type DBReplace struct {

type SchemasReplace struct {
DbMap map[OldID]*DBReplace
RewriteTS uint64
TableFilter filter.Filter
globalTableIdMap map[OldID]NewID
RewriteTS uint64 // used to rewrite commit ts in meta kv.
TableFilter filter.Filter // used to filter schema/table
genGenGlobalID func(ctx context.Context) (int64, error)
genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error)
insertDeleteRangeForTable func(jobID int64, tableIDs []int64)
Expand Down Expand Up @@ -94,8 +95,19 @@ func NewSchemasReplace(
insertDeleteRangeForTable func(jobID int64, tableIDs []int64),
insertDeleteRangeForIndex func(jobID int64, elementID *int64, tableID int64, indexIDs []int64),
) *SchemasReplace {
globalTableIdMap := make(map[OldID]NewID)
for _, dr := range dbMap {
for tblID, tr := range dr.TableMap {
globalTableIdMap[tblID] = tr.NewTableID
for oldpID, newpID := range tr.PartitionMap {
globalTableIdMap[oldpID] = newpID
}
}
}

return &SchemasReplace{
DbMap: dbMap,
globalTableIdMap: globalTableIdMap,
RewriteTS: restoreTS,
TableFilter: tableFilter,
genGenGlobalID: genID,
Expand Down Expand Up @@ -201,6 +213,11 @@ func (sr *SchemasReplace) rewriteKeyForTable(
parseField func([]byte) (tableID int64, err error),
encodeField func(tableID int64) []byte,
) ([]byte, bool, error) {
var (
err error
newID int64
exist bool
)
rawMetaKey, err := ParseTxnMetaKeyFrom(key)
if err != nil {
return nil, false, errors.Trace(err)
Expand All @@ -218,7 +235,7 @@ func (sr *SchemasReplace) rewriteKeyForTable(

dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -228,9 +245,13 @@ func (sr *SchemasReplace) rewriteKeyForTable(

tableReplace, exist := dbReplace.TableMap[tableID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableID] = newID
}
tableReplace = NewTableReplace(nil, newID)
dbReplace.TableMap[tableID] = tableReplace
Expand All @@ -245,15 +266,20 @@ func (sr *SchemasReplace) rewriteKeyForTable(
}

func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bool, error) {
var tableInfo model.TableInfo
var (
tableInfo model.TableInfo
err error
newID int64
exist bool
)
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, false, errors.Trace(err)
}

// update table ID
dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -263,10 +289,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo

tableReplace, exist := dbReplace.TableMap[tableInfo.ID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableInfo.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.TODO())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableInfo.ID] = newID
}

tableReplace = NewTableReplace(&tableInfo, newID)
dbReplace.TableMap[tableInfo.ID] = tableReplace
} else {
Expand All @@ -286,12 +317,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo
partitions := newTableInfo.GetPartitionInfo()
if partitions != nil {
for i, tbl := range partitions.Definitions {
newID, exist := tableReplace.PartitionMap[tbl.ID]
newID, exist = tableReplace.PartitionMap[tbl.ID]
if !exist {
var err error
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tbl.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tbl.ID] = newID
}
tableReplace.PartitionMap[tbl.ID] = newID
}
Expand Down Expand Up @@ -522,8 +556,6 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
return err
}
}
case model.ActionExchangeTablePartition:
return errors.Errorf("restore of ddl `exchange-table-partition` is not supported")
}
}
return nil
Expand Down
Loading

0 comments on commit 1ddeb4f

Please sign in to comment.