Skip to content

Commit

Permalink
Merge branch 'master' into add_bit_push_switch
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Mar 4, 2022
2 parents 43bce8f + 493eb45 commit 6dc835d
Show file tree
Hide file tree
Showing 92 changed files with 4,004 additions and 2,004 deletions.
18 changes: 13 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -589,10 +589,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -623,6 +625,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -673,7 +677,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -820,9 +824,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -837,8 +843,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if cfg.Checkpoint.Enable {
Expand Down
163 changes: 117 additions & 46 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"fmt"
"io"
"math"
"os"
"path"
"sort"
"strings"
"sync"

"github.com/joho/sqltocsv"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version/build"
"go.uber.org/zap"
"modernc.org/mathutil"
Expand Down Expand Up @@ -525,7 +527,11 @@ func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (DB, error) {
return cpdb, nil

case config.CheckpointDriverFile:
return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil
cpdb, err := NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
return cpdb, nil

default:
return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
Expand Down Expand Up @@ -556,13 +562,15 @@ func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error
return result, nil

case config.CheckpointDriverFile:
_, err := os.Stat(cfg.Checkpoint.DSN)
if err == nil {
return true, err
} else if os.IsNotExist(err) {
return false, nil
s, fileName, err := createExstorageByCompletePath(ctx, cfg.Checkpoint.DSN)
if err != nil {
return false, errors.Trace(err)
}
return false, errors.Trace(err)
result, err := s.FileExists(ctx, fileName)
if err != nil {
return false, errors.Trace(err)
}
return result, nil

default:
return false, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
Expand Down Expand Up @@ -940,63 +948,126 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
type FileCheckpointsDB struct {
lock sync.Mutex // we need to ensure only a thread can access to `checkpoints` at a time
checkpoints checkpointspb.CheckpointsModel
ctx context.Context
path string
fileName string
exStorage storage.ExternalStorage
}

func NewFileCheckpointsDB(path string) *FileCheckpointsDB {
func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) {
cpdb := &FileCheckpointsDB{
path: path,
checkpoints: checkpointspb.CheckpointsModel{
TaskCheckpoint: &checkpointspb.TaskCheckpointModel{},
Checkpoints: map[string]*checkpointspb.TableCheckpointModel{},
},
}
// ignore all errors -- file maybe not created yet (and it is fine).
content, err := os.ReadFile(path)
if err == nil {
err2 := cpdb.checkpoints.Unmarshal(content)
if err2 != nil {
log.L().Error("checkpoint file is broken", zap.String("path", path), zap.Error(err2))
}
// FIXME: patch for empty map may need initialize manually, because currently
// FIXME: a map of zero size -> marshall -> unmarshall -> become nil, see checkpoint_test.go
if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = map[string]*checkpointspb.TableCheckpointModel{}
}
for _, table := range cpdb.checkpoints.Checkpoints {
if table.Engines == nil {
table.Engines = map[int32]*checkpointspb.EngineCheckpointModel{}
}
for _, engine := range table.Engines {
if engine.Chunks == nil {
engine.Chunks = map[string]*checkpointspb.ChunkCheckpointModel{}
}
}
}
} else {

// init ExternalStorage
s, fileName, err := createExstorageByCompletePath(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
cpdb.ctx = ctx
cpdb.fileName = fileName
cpdb.exStorage = s

if cpdb.fileName == "" {
return nil, errors.Errorf("the checkpoint DSN '%s' must not be a directory", path)
}

exist, err := cpdb.exStorage.FileExists(ctx, cpdb.fileName)
if err != nil {
return nil, errors.Trace(err)
}
if !exist {
log.L().Info("open checkpoint file failed, going to create a new one",
zap.String("path", path),
log.ShortError(err),
)
return cpdb, nil
}
content, err := cpdb.exStorage.ReadFile(ctx, cpdb.fileName)
if err != nil {
return nil, errors.Trace(err)
}
err = cpdb.checkpoints.Unmarshal(content)
if err != nil {
log.L().Error("checkpoint file is broken", zap.String("path", path), zap.Error(err))
}
// FIXME: patch for empty map may need initialize manually, because currently
// FIXME: a map of zero size -> marshall -> unmarshall -> become nil, see checkpoint_test.go
if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = map[string]*checkpointspb.TableCheckpointModel{}
}
for _, table := range cpdb.checkpoints.Checkpoints {
if table.Engines == nil {
table.Engines = map[int32]*checkpointspb.EngineCheckpointModel{}
}
for _, engine := range table.Engines {
if engine.Chunks == nil {
engine.Chunks = map[string]*checkpointspb.ChunkCheckpointModel{}
}
}
}
return cpdb
return cpdb, nil
}

func (cpdb *FileCheckpointsDB) save() error {
serialized, err := cpdb.checkpoints.Marshal()
// createExstorageByCompletePath create ExternalStorage by completePath and return fileName.
func createExstorageByCompletePath(ctx context.Context, completePath string) (storage.ExternalStorage, string, error) {
if completePath == "" {
return nil, "", nil
}
fileName, newPath, err := separateCompletePath(completePath)
if err != nil {
return errors.Trace(err)
return nil, "", errors.Trace(err)
}
// because `os.WriteFile` is not atomic, directly write into it may reset the file
// to an empty file if write is not finished.
tmpPath := cpdb.path + ".tmp"
if err := os.WriteFile(tmpPath, serialized, 0o600); err != nil {
return errors.Trace(err)
u, err := storage.ParseBackend(newPath, nil)
if err != nil {
return nil, "", errors.Trace(err)
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return nil, "", errors.Trace(err)
}
if err := os.Rename(tmpPath, cpdb.path); err != nil {
return s, fileName, nil
}

// separateCompletePath separates fileName from completePath, returns fileName and newPath.
func separateCompletePath(completePath string) (string, string, error) {
if completePath == "" {
return "", "", nil
}
var fileName, newPath string
purl, err := storage.ParseRawURL(completePath)
if err != nil {
return "", "", errors.Trace(err)
}
// not url format, we don't use url library to avoid being escaped or unescaped
if purl.Scheme == "" {
// no fileName, just path
if strings.HasSuffix(completePath, "/") {
return "", completePath, nil
}
fileName = path.Base(completePath)
newPath = path.Dir(completePath)
} else {
if strings.HasSuffix(purl.Path, "/") {
return "", completePath, nil
}
fileName = path.Base(purl.Path)
purl.Path = path.Dir(purl.Path)
newPath = purl.String()
}
return fileName, newPath, nil
}

func (cpdb *FileCheckpointsDB) save() error {
serialized, err := cpdb.checkpoints.Marshal()
if err != nil {
return errors.Trace(err)
}
return nil
return cpdb.exStorage.WriteFile(cpdb.ctx, cpdb.fileName, serialized)
}

func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error {
Expand Down Expand Up @@ -1529,7 +1600,7 @@ func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName str

if tableName == allTables {
cpdb.checkpoints.Reset()
return errors.Trace(os.Remove(cpdb.path))
return errors.Trace(cpdb.exStorage.DeleteFile(cpdb.ctx, cpdb.fileName))
}

delete(cpdb.checkpoints.Checkpoints, tableName)
Expand All @@ -1540,8 +1611,8 @@ func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

newPath := fmt.Sprintf("%s.%d.bak", cpdb.path, taskID)
return errors.Trace(os.Rename(cpdb.path, newPath))
newFileName := fmt.Sprintf("%s.%d.bak", cpdb.fileName, taskID)
return cpdb.exStorage.Rename(cpdb.ctx, cpdb.fileName, newFileName)
}

func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error) {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/lightning/checkpoints/checkpoints_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ func newTestConfig() *config.Config {

func newFileCheckpointsDB(t *testing.T) (*checkpoints.FileCheckpointsDB, func()) {
dir := t.TempDir()
cpdb := checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb"))
ctx := context.Background()
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, filepath.Join(dir, "cp.pb"))
require.NoError(t, err)

// 2. initialize with checkpoint data.
cfg := newTestConfig()
err := cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{
err = cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{
"db1": {
Name: "db1",
Tables: map[string]*checkpoints.TidbTableInfo{
Expand Down
Loading

0 comments on commit 6dc835d

Please sign in to comment.