Skip to content

Commit

Permalink
Merge branch 'release-5.4' into release-5.4-2f2fa06c2fa5
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole authored Jun 27, 2022
2 parents 663a90a + 8f13676 commit b2449a1
Show file tree
Hide file tree
Showing 138 changed files with 3,152 additions and 932 deletions.
112 changes: 66 additions & 46 deletions br/metrics/grafana/br.json

Large diffs are not rendered by default.

84 changes: 52 additions & 32 deletions br/metrics/grafana/lightning.json

Large diffs are not rendered by default.

45 changes: 34 additions & 11 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -574,10 +574,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -608,6 +610,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -658,7 +662,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -803,9 +807,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -820,8 +826,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down Expand Up @@ -926,9 +934,17 @@ backupLoop:
})
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
switch val.(string) {
case "Unavaiable":
{
logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
case "Internal":
{
logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.")
err = status.Error(codes.Internal, "Internal error")
}
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
Expand Down Expand Up @@ -994,16 +1010,23 @@ const (

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {

if status.Code(err) == codes.Unavailable {
return true
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -125,6 +127,13 @@ func (push *pushDown) pushBackup(
// Finished.
return res, nil
}
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
Expand All @@ -139,6 +148,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Session interface {
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
GetGlobalVariable(name string) (string, error)
}

// Progress is an interface recording the current execution progress.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func (gs *tidbSession) Close() {
gs.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if cfg.Checkpoint.Enable {
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,19 @@ func Keys(keys [][]byte) zap.Field {
return zap.Object("keys", zapKeysMarshaler(keys))
}

// AShortError make the zap field with key to display error without verbose representation (e.g. the stack trace).
func AShortError(key string, err error) zap.Field {
if err == nil {
return zap.Skip()
}
return zap.String(key, err.Error())
}

// ShortError make the zap field to display error without verbose representation (e.g. the stack trace).
func ShortError(err error) zap.Field {
if err == nil {
return zap.Skip()
}
return zap.String("error", err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (p *PdController) getStoreInfoWith(

func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)})
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout.Seconds())})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
53 changes: 34 additions & 19 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -102,6 +103,7 @@ func NewRestoreClient(
store kv.Storage,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
isRawKv bool,
) (*Client, error) {
db, err := NewDB(g, store)
if err != nil {
Expand All @@ -117,16 +119,19 @@ func NewRestoreClient(
if dom != nil {
statsHandle = dom.StatsHandle()
}
// init backupMeta only for passing unit test
backupMeta := new(backuppb.BackupMeta)

return &Client{
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf),
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
db: db,
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
dom: dom,
statsHandler: statsHandle,
backupMeta: backupMeta,
}, nil
}

Expand Down Expand Up @@ -205,7 +210,7 @@ func (rc *Client) InitBackupMeta(
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit)
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
Expand Down Expand Up @@ -450,8 +455,7 @@ func (rc *Client) GoCreateTables(
) <-chan CreatedTable {
// Could we have a smaller size of tables?
log.Info("start create tables")

ddlTables := rc.DDLJobsMap()
ddlTables := rc.GenerateRebasedTables(tables)
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.GoCreateTables", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand Down Expand Up @@ -595,6 +599,15 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F
return files[:idx], files[idx:]
}

// SplitRanges implements TiKVRestorer.
func (rc *Client) SplitRanges(ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool) error {
return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv)
}

// RestoreFiles tries to restore the files.
func (rc *Client) RestoreFiles(
ctx context.Context,
Expand Down Expand Up @@ -1119,22 +1132,24 @@ func (rc *Client) IsSkipCreateSQL() bool {
return rc.noSchema
}

// DDLJobsMap returns a map[UniqueTableName]bool about < db table, hasCreate/hasTruncate DDL >.
// if we execute some DDLs before create table.
// we may get two situation that need to rebase auto increment/random id.
// 1. truncate table: truncate will generate new id cache.
// 2. create table/create and rename table: the first create table will lock down the id cache.
// because we cannot create onExistReplace table.
// so the final create DDL with the correct auto increment/random id won't be executed.
func (rc *Client) DDLJobsMap() map[UniqueTableName]bool {
m := make(map[UniqueTableName]bool)
for _, job := range rc.ddlJobs {
switch job.Type {
case model.ActionTruncateTable, model.ActionCreateTable, model.ActionRenameTable:
m[UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()}] = true
}
// GenerateRebasedTables generate a map[UniqueTableName]bool to represent tables that haven't updated table info.
// there are two situations:
// 1. tables that already exists in the restored cluster.
// 2. tables that are created by executing ddl jobs.
// so, only tables in incremental restoration will be added to the map
func (rc *Client) GenerateRebasedTables(tables []*metautil.Table) (rebasedTablesMap map[UniqueTableName]bool) {
if !rc.IsIncremental() {
// in full restoration, all tables are created by Session.CreateTable, and all tables' info is updated.
rebasedTablesMap = make(map[UniqueTableName]bool)
return
}
return m

rebasedTablesMap = make(map[UniqueTableName]bool, len(tables))
for _, table := range tables {
rebasedTablesMap[UniqueTableName{DB: table.DB.Name.String(), Table: table.Info.Name.String()}] = true
}

return
}

// PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *testRestoreClientSuite) TearDownTest(c *C) {
}

func (s *testRestoreClientSuite) TestCreateTables(c *C) {
client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg, false)
c.Assert(err, IsNil)

info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64)
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {

func (s *testRestoreClientSuite) TestIsOnline(c *C) {

client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg, false)
c.Assert(err, IsNil)

c.Assert(client.IsOnline(), IsFalse)
Expand All @@ -113,7 +113,7 @@ func (s *testRestoreClientSuite) TestIsOnline(c *C) {

func (s *testRestoreClientSuite) TestPreCheckTableClusterIndex(c *C) {

client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg, false)
c.Assert(err, IsNil)

info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *testRestoreClientSuite) TestPreCheckTableTiFlashReplicas(c *C) {

client, err := restore.NewRestoreClient(gluetidb.New(), fakePDClient{
stores: mockStores,
}, s.mock.Storage, nil, defaultKeepaliveCfg)
}, s.mock.Storage, nil, defaultKeepaliveCfg, false)
c.Assert(err, IsNil)

tables := make([]*metautil.Table, 4)
Expand Down
Loading

0 comments on commit b2449a1

Please sign in to comment.