Skip to content

Commit

Permalink
Merge branch 'intersect_index_merge' of github.com:guo-shaoge/tidb in…
Browse files Browse the repository at this point in the history
…to intersect_index_merge
  • Loading branch information
guo-shaoge committed Nov 29, 2022
2 parents 3e5414e + 452c5f7 commit 9233461
Show file tree
Hide file tree
Showing 124 changed files with 3,118 additions and 1,571 deletions.
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) --remote_download_minimal \
//... --//build:with_nogo_flag=true
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //tidb-server:tidb-server //tidb-server:tidb-server-check --//build:with_nogo_flag=true
Expand All @@ -442,27 +442,27 @@ bazel_golangcilinter:
-- run $$($(PACKAGE_DIRECTORIES)) --config ./.golangci.yaml

bazel_brietest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/brietest/...

bazel_pessimistictest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/pessimistictest/...

bazel_sessiontest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/sessiontest/...

bazel_statisticstest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/statisticstest/...

bazel_txntest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/txntest/...

bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
-- //tests/realtikvtest/addindextest/...

bazel_lint: bazel_prepare
Expand Down
1 change: 1 addition & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//meta",
"//metrics",
"//owner",
"//parser/model",
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
Expand Down
11 changes: 6 additions & 5 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (alloc *autoIDValue) alloc4Unsigned(ctx context.Context, store kv.Storage,

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -137,7 +138,7 @@ func (alloc *autoIDValue) alloc4Signed(ctx context.Context,

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -188,7 +189,7 @@ func (alloc *autoIDValue) rebase4Unsigned(ctx context.Context,
startTime := time.Now()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down Expand Up @@ -221,7 +222,7 @@ func (alloc *autoIDValue) rebase4Signed(ctx context.Context, store kv.Storage, d
var newBase, newEnd int64
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down Expand Up @@ -451,7 +452,7 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbID, tblID, requiredBase int64, isUnsigned bool) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).RowID()
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
ranges = ranger.FullIntRange(false)
}

retRanges := make([]kv.KeyRange, 0, 1+len(tbl.Indices))
kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
retRanges = kvRanges.AppendSelfTo(retRanges)

for _, index := range tbl.Indices {
if index.State != model.StatePublic {
Expand All @@ -304,9 +306,9 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
if err != nil {
return nil, errors.Trace(err)
}
kvRanges = append(kvRanges, idxRanges...)
retRanges = idxRanges.AppendSelfTo(retRanges)
}
return kvRanges, nil
return retRanges, nil
}

// BuildBackupRangeAndSchema gets KV range and schema of tables.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestChecksum(t *testing.T) {
first = false
ranges, err := backup.BuildTableRanges(tableInfo3)
require.NoError(t, err)
require.Equalf(t, ranges[:1], req.KeyRanges, "%v", req.KeyRanges)
require.Equalf(t, ranges[:1], req.KeyRanges.FirstPartitionRange(), "%v", req.KeyRanges.FirstPartitionRange())
}
return nil
}))
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ var (
ErrStorageInvalidPermission = errors.Normalize("external storage permission", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidPermission"))

// Snapshot restore
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreRegionWithoutPeer = errors.Normalize("restore met a region without any peer", errors.RFCCodeText("BR:EBS:ErrRestoreRegionWithoutPeer"))

// Errors reported from TiKV.
ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage"))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type panickingAllocator struct {
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
false,
&panickingAllocator{base: sharedBase, ty: autoid.RowIDAllocType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoIncrementType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoRandomType},
Expand Down
66 changes: 30 additions & 36 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func physicalTableIDs(tableInfo *model.TableInfo) []int64 {
}

// tableHandleKeyRanges returns all key ranges associated with the tableInfo.
func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error) {
func tableHandleKeyRanges(tableInfo *model.TableInfo) (*tidbkv.KeyRanges, error) {
ranges := ranger.FullIntRange(false)
if tableInfo.IsCommonHandle {
ranges = ranger.FullRange()
Expand All @@ -221,18 +221,9 @@ func tableHandleKeyRanges(tableInfo *model.TableInfo) ([]tidbkv.KeyRange, error)
}

// tableIndexKeyRanges returns all key ranges associated with the tableInfo and indexInfo.
func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) ([]tidbkv.KeyRange, error) {
func tableIndexKeyRanges(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) (*tidbkv.KeyRanges, error) {
tableIDs := physicalTableIDs(tableInfo)
//nolint: prealloc
var keyRanges []tidbkv.KeyRange
for _, tid := range tableIDs {
partitionKeysRanges, err := distsql.IndexRangesToKVRanges(nil, tid, indexInfo.ID, ranger.FullRange(), nil)
if err != nil {
return nil, errors.Trace(err)
}
keyRanges = append(keyRanges, partitionKeysRanges...)
}
return keyRanges, nil
return distsql.IndexRangesToKVRangesForTables(nil, tableIDs, indexInfo.ID, ranger.FullRange(), nil)
}

// DupKVStream is a streaming interface for collecting duplicate key-value pairs.
Expand Down Expand Up @@ -561,14 +552,20 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]dupTask, 0, len(keyRanges))
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
})
tasks := make([]dupTask, 0, keyRanges.TotalRangeNum()*(1+len(m.tbl.Meta().Indices)))
putToTaskFunc := func(ranges []tidbkv.KeyRange) {
if len(ranges) == 0 {
return
}
tid := tablecodec.DecodeTableID(ranges[0].StartKey)
for _, r := range ranges {
tasks = append(tasks, dupTask{
KeyRange: r,
tableID: tid,
})
}
}
keyRanges.ForEachPartition(putToTaskFunc)
for _, indexInfo := range m.tbl.Meta().Indices {
if indexInfo.State != model.StatePublic {
continue
Expand All @@ -577,14 +574,7 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
indexInfo: indexInfo,
})
}
keyRanges.ForEachPartition(putToTaskFunc)
}
return tasks, nil
}
Expand All @@ -598,15 +588,19 @@ func (m *DuplicateManager) buildIndexDupTasks() ([]dupTask, error) {
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]dupTask, 0, len(keyRanges))
for _, kr := range keyRanges {
tableID := tablecodec.DecodeTableID(kr.StartKey)
tasks = append(tasks, dupTask{
KeyRange: kr,
tableID: tableID,
indexInfo: indexInfo,
})
}
tasks := make([]dupTask, 0, keyRanges.TotalRangeNum())
keyRanges.ForEachPartition(func(ranges []tidbkv.KeyRange) {
if len(ranges) == 0 {
return
}
tid := tablecodec.DecodeTableID(ranges[0].StartKey)
for _, r := range ranges {
tasks = append(tasks, dupTask{
KeyRange: r,
tableID: tid,
})
}
})
return tasks, nil
}
return nil, nil
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,13 @@ func (rng StringifyRange) String() string {
sb.WriteString(")")
return sb.String()
}

// StringifyMany returns an array marshaler for a slice of stringers.
func StringifyMany[T fmt.Stringer](items []T) zapcore.ArrayMarshaler {
return zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
for _, item := range items {
ae.AppendString(item.String())
}
return nil
})
}
23 changes: 14 additions & 9 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type RecoverRegion struct {
// 2. build a leader list for all region during the tikv startup
// 3. get max allocate id
func (recovery *Recovery) MakeRecoveryPlan() error {
storeBalanceScore := make(map[uint64]int, len(recovery.allStores))
// Group region peer info by region id. find the max allocateId
// region [id] [peer[0-n]]
var regions = make(map[uint64][]*RecoverRegion, 0)
Expand Down Expand Up @@ -410,16 +411,20 @@ func (recovery *Recovery) MakeRecoveryPlan() error {
}
} else {
// Generate normal commands.
log.Debug("detected valid peer", zap.Uint64("region id", regionId))
for i, peer := range peers {
log.Debug("make plan", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: peer.RegionId, AsLeader: i == 0}
// sorted by log term -> last index -> commit index in a region
if plan.AsLeader {
log.Debug("as leader peer", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
recovery.RecoveryPlan[peer.StoreId] = append(recovery.RecoveryPlan[peer.StoreId], plan)
}
log.Debug("detected valid region", zap.Uint64("region id", regionId))
// calc the leader candidates
leaderCandidates, err := LeaderCandidates(peers)
if err != nil {
log.Warn("region without peer", zap.Uint64("region id", regionId))
return errors.Trace(err)
}

// select the leader base on tikv storeBalanceScore
leader := SelectRegionLeader(storeBalanceScore, leaderCandidates)
log.Debug("as leader peer", zap.Uint64("store id", leader.StoreId), zap.Uint64("region id", leader.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: leader.RegionId, AsLeader: true}
recovery.RecoveryPlan[leader.StoreId] = append(recovery.RecoveryPlan[leader.StoreId], plan)
storeBalanceScore[leader.StoreId] += 1
}
}
return nil
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,43 @@ func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]
}
return validPeers, nil
}

// in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers
// LeaderCandidates select all peers can be select as a leader during the restore
func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error) {
if peers == nil {
return nil, errors.Annotatef(berrors.ErrRestoreRegionWithoutPeer,
"invalid region range")
}
candidates := make([]*RecoverRegion, 0, len(peers))
// by default, the peers[0] to be assign as a leader, since peers already sorted by leader selection rule
leader := peers[0]
candidates = append(candidates, leader)
for _, peer := range peers[1:] {
// qualificated candidate is leader.logterm = candidate.logterm && leader.lastindex = candidate.lastindex && && leader.commitindex = candidate.commitindex
if peer.LastLogTerm == leader.LastLogTerm && peer.LastIndex == leader.LastIndex && peer.CommitIndex == leader.CommitIndex {
log.Debug("leader candidate", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
candidates = append(candidates, peer)
}
}
return candidates, nil
}

// for region A, has candidate leader x, y, z
// peer x on store 1 with storeBalanceScore 3
// peer y on store 3 with storeBalanceScore 2
// peer z on store 4 with storeBalanceScore 1
// result: peer z will be select as leader on store 4
func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion {
// by default, the peers[0] to be assign as a leader
leader := peers[0]
minLeaderStore := storeBalanceScore[leader.StoreId]
for _, peer := range peers[1:] {
log.Debug("leader candidate", zap.Int("score", storeBalanceScore[peer.StoreId]), zap.Int("min-score", minLeaderStore), zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
if storeBalanceScore[peer.StoreId] < minLeaderStore {
minLeaderStore = storeBalanceScore[peer.StoreId]
leader = peer
}
}
return leader
}
Loading

0 comments on commit 9233461

Please sign in to comment.