Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: restore tiflash replica count after PiTR #37181

Merged
merged 21 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
Expand Down Expand Up @@ -1630,20 +1631,30 @@ func (rc *Client) GetRebasedTables() map[UniqueTableName]bool {
return rc.rebasedTablesMap
}

func (rc *Client) getTiFlashNodeCount(ctx context.Context) (uint64, error) {
tiFlashStores, err := util.GetAllTiKVStores(ctx, rc.pdClient, util.TiFlashOnly)
if err != nil {
return 0, errors.Trace(err)
}
return uint64(len(tiFlashStores)), nil
}

// PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.
func (rc *Client) PreCheckTableTiFlashReplica(
ctx context.Context,
tables []*metautil.Table,
skipTiflash bool,
recorder *tiflashrec.TiFlashRecorder,
) error {
tiFlashStores, err := util.GetAllTiKVStores(ctx, rc.pdClient, util.TiFlashOnly)
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
if err != nil {
return errors.Trace(err)
return err
}
tiFlashStoreCount := len(tiFlashStores)
for _, table := range tables {
if skipTiflash ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > uint64(tiFlashStoreCount)) {
if recorder != nil ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > tiFlashStoreCount) {
if recorder != nil && table.Info.TiFlashReplica != nil {
recorder.AddTable(table.Info.ID, *table.Info.TiFlashReplica)
}
// we cannot satisfy TiFlash replica in restore cluster. so we should
// set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash
// see details at https://github.com/pingcap/br/issues/931
Expand Down Expand Up @@ -1986,7 +1997,8 @@ func (rc *Client) InitSchemasReplaceForDDL(
}()...)
}

return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
rp := stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex)
return rp, nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -383,7 +384,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
}
}
ctx := context.Background()
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, false))
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, nil))

for i := 0; i < len(tables); i++ {
if i == 0 || i > 2 {
Expand All @@ -395,7 +396,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
}
}

require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, true))
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, tiflashrec.New()))
for i := 0; i < len(tables); i++ {
require.Nil(t, tables[i].Info.TiFlashReplica)
}
Expand Down
129 changes: 129 additions & 0 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2022-present PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tiflashrec

import (
"bytes"
"fmt"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
)

// TiFlashRecorder records the information of TiFlash replicas
// during restoring.
// Because the limit of the current implementation, we add serval hooks
// to observe the information we need:
// - Before full restore create tables:
// We record the tiflash replica information and remove the replica info.
// Because during PiTR restore, the transaction model would be broken, which breaks TiFlash too.
// We must make sure they won't be replicated to TiFlash during the whole PiTR procedure.
// - After full restore created tables, generating rewrite rules:
// We perform the rewrite rule over our records.
// We trace table via table ID instead of table name so we can handle `RENAME` DDLs.
// - When doing PiTR restore, after rewriting table info in meta key:
// We update the replica information
type TiFlashRecorder struct {
// Table ID -> TiFlash Count
items map[int64]model.TiFlashReplicaInfo
}

func New() *TiFlashRecorder {
return &TiFlashRecorder{
items: map[int64]model.TiFlashReplicaInfo{},
}
}

func (r *TiFlashRecorder) AddTable(tableID int64, replica model.TiFlashReplicaInfo) {
log.Info("recording tiflash replica", zap.Int64("table", tableID), zap.Any("replica", replica))
r.items[tableID] = replica
}

func (r *TiFlashRecorder) DelTable(tableID int64) {
delete(r.items, tableID)
}

func (r *TiFlashRecorder) Iterate(f func(tableID int64, replica model.TiFlashReplicaInfo)) {
for k, v := range r.items {
f(k, v)
}
}

func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) {
if newID == oldID {
return
}
old, ok := r.items[oldID]
joccau marked this conversation as resolved.
Show resolved Hide resolved
log.Info("rewriting tiflash replica", zap.Int64("old", oldID), zap.Int64("new", newID), zap.Bool("success", ok))
if ok {
r.items[newID] = old
delete(r.items, oldID)
}
}

func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(id)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
}
schema, ok := info.SchemaByTable(table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
return
}
altTableSpec, err := alterTableSpecOf(replica)
if err != nil {
log.Warn("Failed to generate the alter table spec", logutil.ShortError(err), zap.Any("replica", replica))
return
}
items = append(items, fmt.Sprintf(
"ALTER TABLE %s %s",
utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O),
altTableSpec),
)
})
return items
}

func alterTableSpecOf(replica model.TiFlashReplicaInfo) (string, error) {
spec := &ast.AlterTableSpec{
Tp: ast.AlterTableSetTiFlashReplica,
TiFlashReplica: &ast.TiFlashReplicaSpec{
Count: replica.Count,
Labels: replica.LocationLabels,
},
}

buf := bytes.NewBuffer(make([]byte, 0, 32))
restoreCx := format.NewRestoreCtx(
format.RestoreKeyWordUppercase|
format.RestoreNameBackQuotes|
format.RestoreStringSingleQuotes|
format.RestoreStringEscapeBackslash,
buf)
if err := spec.Restore(restoreCx); err != nil {
return "", err
}
return buf.String(), nil
}
172 changes: 172 additions & 0 deletions br/pkg/restore/tiflashrec/tiflash_recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2022-present PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tiflashrec_test

import (
"fmt"
"testing"

"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

type op func(*tiflashrec.TiFlashRecorder)

func add(tableID int64, replica int) op {
return func(tfr *tiflashrec.TiFlashRecorder) {
tfr.AddTable(tableID, model.TiFlashReplicaInfo{
Count: uint64(replica),
})
}
}

func rewrite(tableID, newTableID int64) op {
return func(tfr *tiflashrec.TiFlashRecorder) {
tfr.Rewrite(tableID, newTableID)
}
}

func del(tableID int64) op {
return func(tfr *tiflashrec.TiFlashRecorder) {
tfr.DelTable(tableID)
}
}

func ops(ops ...op) op {
return func(tfr *tiflashrec.TiFlashRecorder) {
for _, op := range ops {
op(tfr)
}
}
}

type table struct {
id int64
replica int
}

func t(id int64, replica int) table {
return table{
id: id,
replica: replica,
}
}

func TestRecorder(tCtx *testing.T) {
type Case struct {
o op
ts []table
}
cases := []Case{
{
o: ops(
add(42, 1),
add(43, 2),
),
ts: []table{
t(42, 1),
t(43, 2),
},
},
{
o: ops(
add(42, 3),
add(43, 1),
del(42),
),
ts: []table{
t(43, 1),
},
},
{
o: ops(
add(41, 4),
add(42, 8),
rewrite(42, 1890),
rewrite(1890, 43),
rewrite(41, 100),
),
ts: []table{
t(43, 8),
t(100, 4),
},
},
}

check := func(t *testing.T, c Case) {
rec := tiflashrec.New()
req := require.New(t)
c.o(rec)
tmap := map[int64]int{}
for _, t := range c.ts {
tmap[t.id] = t.replica
}

rec.Iterate(func(tableID int64, replicaReal model.TiFlashReplicaInfo) {
replica, ok := tmap[tableID]
req.True(ok, "the key %d not recorded", tableID)
req.EqualValues(replica, replicaReal.Count, "the replica mismatch")
delete(tmap, tableID)
})
req.Empty(tmap, "not all required are recorded")
}

for i, c := range cases {
tCtx.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
check(t, c)
})
}
}

func TestGenSql(t *testing.T) {
tInfo := func(id int, name string) *model.TableInfo {
return &model.TableInfo{
ID: int64(id),
Name: model.NewCIStr(name),
}
}
fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{
tInfo(1, "fruits"),
tInfo(2, "whisper"),
tInfo(3, "woods"),
tInfo(4, "evils"),
})
rec := tiflashrec.New()
rec.AddTable(1, model.TiFlashReplicaInfo{
Count: 1,
})
rec.AddTable(2, model.TiFlashReplicaInfo{
Count: 2,
LocationLabels: []string{"climate"},
})
rec.AddTable(3, model.TiFlashReplicaInfo{
Count: 3,
LocationLabels: []string{"leaf", "seed"},
})
rec.AddTable(4, model.TiFlashReplicaInfo{
Count: 1,
LocationLabels: []string{`kIll'; OR DROP DATABASE test --`, `dEaTh with \"quoting\"`},
})

sqls := rec.GenerateAlterTableDDLs(fakeInfo)
require.ElementsMatch(t, sqls, []string{
"ALTER TABLE `test`.`whisper` SET TIFLASH REPLICA 2 LOCATION LABELS 'climate'",
"ALTER TABLE `test`.`woods` SET TIFLASH REPLICA 3 LOCATION LABELS 'leaf', 'seed'",
"ALTER TABLE `test`.`fruits` SET TIFLASH REPLICA 1",
"ALTER TABLE `test`.`evils` SET TIFLASH REPLICA 1 LOCATION LABELS 'kIll''; OR DROP DATABASE test --', 'dEaTh with " + `\\"quoting\\"` + "'",
})
}
Loading