Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BR: update br keep upstream #26972

Merged
merged 21 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1778c1a
Merge commit 'f5053bc8f0b9594cdd5d45a7de8f110399eac49c' as 'br'
3pointer Aug 5, 2021
f5053bc
Squashed 'br/' content from commit cf621d7c0
3pointer Aug 5, 2021
4ef36b2
change the import path of br cmd/pkg
3pointer Aug 4, 2021
b16b84e
change the failpoint path of br/tests
3pointer Aug 4, 2021
93b4d7e
change the import path of br/web br/tools
3pointer Aug 4, 2021
82730e7
change the import path of tidb
3pointer Aug 4, 2021
b565916
remove cycle depenence in go.mod
3pointer Aug 4, 2021
25fb118
update Makefile
3pointer Aug 4, 2021
aa686b9
change the import path after subtree update
3pointer Aug 4, 2021
968283d
update go mod after merge master
3pointer Aug 4, 2021
bac916f
fix the issue that check failed after subtree update to master
3pointer Aug 5, 2021
d6b0cdc
remove extra space in MakeFile
3pointer Aug 5, 2021
5ac2d5a
reorder the imports pkg in br
3pointer Aug 5, 2021
ba848c2
update go.mod after rebase master && kvproto updated
3pointer Aug 5, 2021
fe28074
reorder go imports by tidb requirements
3pointer Aug 5, 2021
389b4e1
br: reduce sleep seconds for unit test
3pointer Aug 5, 2021
b32ce9b
Merge branch 'master' into br_keep_upstream
3pointer Aug 6, 2021
cc25ce9
Squashed 'br/' changes from cf621d7c0..3b1308e89
3pointer Aug 6, 2021
ac0327c
Merge commit 'cc25ce9f3fbc6971e757b2c4c856827144477784' into br_keep_…
3pointer Aug 6, 2021
7635a36
Merge branch 'master' into br_keep_upstream
ti-chi-bot Aug 8, 2021
ae5558c
Merge branch 'master' into br_keep_upstream
ti-chi-bot Aug 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,12 @@ func (bc *Client) BackupRanges(
// we collect all files in a single goroutine to avoid thread safety issues.
workerPool := utils.NewWorkerPool(concurrency, "Ranges")
eg, ectx := errgroup.WithContext(ctx)
for _, r := range ranges {
for id, r := range ranges {
id := id
sk, ek := r.StartKey, r.EndKey
workerPool.ApplyOnErrorGroup(eg, func() error {
err := bc.BackupRange(ectx, sk, ek, req, metaWriter, progressCallBack)
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -446,15 +448,14 @@ func (bc *Client) BackupRange(
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("backup range finished", zap.Duration("take", elapsed))
logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed))
key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey)
if err != nil {
summary.CollectFailureUnit(key, err)
}
}()
log.Info("backup started",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.CL(ctx).Info("backup started",
logutil.Key("startKey", startKey), logutil.Key("endKey", endKey),
zap.Uint64("rateLimit", req.RateLimit),
zap.Uint32("concurrency", req.Concurrency))

Expand All @@ -475,7 +476,7 @@ func (bc *Client) BackupRange(
if err != nil {
return errors.Trace(err)
}
log.Info("finish backup push down", zap.Int("Ok", results.Len()))
logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len()))

// Find and backup remaining ranges.
// TODO: test fine grained backup.
Expand All @@ -490,12 +491,12 @@ func (bc *Client) BackupRange(
progressCallBack(RangeUnit)

if req.IsRawKv {
log.Info("backup raw ranges",
logutil.CL(ctx).Info("raw ranges backed up",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
zap.String("cf", req.Cf))
} else {
log.Info("backup time range",
logutil.CL(ctx).Info("time range backed up",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
}
Expand Down Expand Up @@ -590,7 +591,7 @@ func (bc *Client) fineGrainedBackup(
if len(incomplete) == 0 {
return nil
}
log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
logutil.CL(ctx).Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
// Step2, retry backup on incomplete range
respCh := make(chan *backuppb.BackupResponse, 4)
errCh := make(chan error, 4)
Expand Down Expand Up @@ -647,12 +648,12 @@ func (bc *Client) fineGrainedBackup(
break selectLoop
}
if resp.Error != nil {
log.Panic("unexpected backup error",
logutil.CL(ctx).Panic("unexpected backup error",
zap.Reflect("error", resp.Error))
}
log.Info("put fine grained range",
logutil.Key("startKey", resp.StartKey),
logutil.Key("endKey", resp.EndKey),
logutil.CL(ctx).Info("put fine grained range",
logutil.Key("fine-grained-range-start", resp.StartKey),
logutil.Key("fine-grained-range-end", resp.EndKey),
)
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

Expand Down Expand Up @@ -780,11 +781,11 @@ func (bc *Client) handleFineGrained(
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}

log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID))
return 0, errors.Annotatef(err, "failed to connect to store %d", storeID)
}
hasProgress := false
Expand All @@ -811,17 +812,17 @@ func (bc *Client) handleFineGrained(
return nil
},
func() (backuppb.BackupClient, error) {
log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}
log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)",
redact.Key(req.StartKey), redact.Key(req.EndKey))
}
Expand All @@ -839,6 +840,7 @@ func (bc *Client) handleFineGrained(
// Stop receiving response if respFn returns error.
func SendBackup(
ctx context.Context,
// the `storeID` seems only used for logging now, maybe we can remove it then?
storeID uint64,
client backuppb.BackupClient,
req backuppb.BackupRequest,
Expand All @@ -857,14 +859,11 @@ func SendBackup(
var errReset error
backupLoop:
for retry := 0; retry < backupRetryTimes; retry++ {
log.Info("try backup",
logutil.Key("startKey", req.StartKey),
logutil.Key("endKey", req.EndKey),
zap.Uint64("storeID", storeID),
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
log.Info("failpoint hint-backup-start injected, " +
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
Expand All @@ -880,13 +879,13 @@ backupLoop:
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-retryable-error injected.")
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-not-retryable-error injected.")
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
}
})
Expand All @@ -900,7 +899,7 @@ backupLoop:
}
continue
}
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}
Expand All @@ -910,9 +909,8 @@ backupLoop:
resp, err := bcli.Recv()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
logutil.CL(ctx).Info("backup streaming finish",
zap.Int("retry-time", retry))
break backupLoop
}
if isRetryableError(err) {
Expand All @@ -929,9 +927,9 @@ backupLoop:
}

// TODO: handle errors in the resp.
log.Info("range backuped",
logutil.Key("startKey", resp.GetStartKey()),
logutil.Key("endKey", resp.GetEndKey()))
logutil.CL(ctx).Info("range backed up",
logutil.Key("small-range-start-key", resp.GetStartKey()),
logutil.Key("small-range-end-key", resp.GetEndKey()))
err = respFn(resp)
if err != nil {
return errors.Trace(err)
Expand Down
34 changes: 17 additions & 17 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
Expand Down Expand Up @@ -66,30 +65,31 @@ func (push *pushDown) pushBackup(
// Push down backup tasks to all tikv instances.
res := rtree.NewRangeTree()
failpoint.Inject("noop-backup", func(_ failpoint.Value) {
log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
failpoint.Return(res, nil)
})

wg := new(sync.WaitGroup)
for _, s := range stores {
store := s
storeID := s.GetId()
lctx := logutil.ContextWithField(ctx, zap.Uint64("store-id", storeID))
if s.GetState() != metapb.StoreState_Up {
log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState()))
logutil.CL(lctx).Warn("skip store", zap.Stringer("State", s.GetState()))
continue
}
client, err := push.mgr.GetBackupClient(ctx, storeID)
client, err := push.mgr.GetBackupClient(lctx, storeID)
if err != nil {
// BR should be able to backup even some of stores disconnected.
// The regions managed by this store can be retried at fine-grained backup then.
log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err))
logutil.CL(lctx).Warn("fail to connect store, skipping", zap.Error(err))
return res, nil
}
wg.Add(1)
go func() {
defer wg.Done()
err := SendBackup(
ctx, storeID, client, req,
lctx, storeID, client, req,
func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
push.respCh <- responseAndStore{
Expand All @@ -99,8 +99,8 @@ func (push *pushDown) pushBackup(
return nil
},
func() (backuppb.BackupClient, error) {
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(ctx, storeID)
logutil.CL(lctx).Warn("reset the connection in push")
return push.mgr.ResetBackupClient(lctx, storeID)
})
// Disconnected stores can be ignored.
if err != nil {
Expand All @@ -127,14 +127,14 @@ func (push *pushDown) pushBackup(
}
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
Expand All @@ -150,28 +150,28 @@ func (push *pushDown) pushBackup(
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
case *backuppb.Error_KvError:
log.Warn("backup occur kv error", zap.Reflect("error", v))
logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v))

case *backuppb.Error_RegionError:
log.Warn("backup occur region error", zap.Reflect("error", v))
logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v))

case *backuppb.Error_ClusterIdError:
log.Error("backup occur cluster ID error", zap.Reflect("error", v))
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
return res, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
default:
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
log.Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("File or directory not found error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid."))
}

if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure tikv has permission to read from & write to the storage."))
}
return res, berrors.ErrKVStorage
Expand All @@ -181,7 +181,7 @@ func (push *pushDown) pushBackup(
if !berrors.Is(err, berrors.ErrFailedToConnect) {
return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey))
}
log.Warn("skipping disconnected stores", logutil.ShortError(err))
logutil.CL(ctx).Warn("skipping disconnected stores", logutil.ShortError(err))
return res, nil
}
}
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ func autoRandomIncrementBits(col *table.Column, randomBits int) int {
}

// collectGeneratedColumns collects all expressions required to evaluate the
// results of all stored generated columns. The returning slice is in evaluation
// order.
// results of all generated columns. The returning slice is in evaluation order.
func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.Column) ([]genCol, error) {
maxGenColOffset := -1
hasGenCol := false
for _, col := range cols {
if col.GeneratedStored && col.Offset > maxGenColOffset {
maxGenColOffset = col.Offset
if col.GeneratedExpr != nil {
hasGenCol = true
break
}
}

if maxGenColOffset < 0 {
if !hasGenCol {
return nil, nil
}

Expand Down Expand Up @@ -165,7 +165,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C
// for simplicity we just evaluate all generated columns (virtual or not) before the last stored one.
var genCols []genCol
for i, col := range cols {
if col.GeneratedExpr != nil && col.Offset <= maxGenColOffset {
if col.GeneratedExpr != nil {
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names)
if err != nil {
return nil, err
Expand Down
Loading