Skip to content

Commit

Permalink
cherry pick pingcap#26972 to release-5.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
3pointer committed Aug 18, 2021
1 parent 3523b5d commit a648667
Show file tree
Hide file tree
Showing 33 changed files with 1,117 additions and 401 deletions.
64 changes: 31 additions & 33 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,12 @@ func (bc *Client) BackupRanges(
// we collect all files in a single goroutine to avoid thread safety issues.
workerPool := utils.NewWorkerPool(concurrency, "Ranges")
eg, ectx := errgroup.WithContext(ctx)
for _, r := range ranges {
for id, r := range ranges {
id := id
sk, ek := r.StartKey, r.EndKey
workerPool.ApplyOnErrorGroup(eg, func() error {
err := bc.BackupRange(ectx, sk, ek, req, metaWriter, progressCallBack)
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -446,15 +448,14 @@ func (bc *Client) BackupRange(
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("backup range finished", zap.Duration("take", elapsed))
logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed))
key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey)
if err != nil {
summary.CollectFailureUnit(key, err)
}
}()
log.Info("backup started",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.CL(ctx).Info("backup started",
logutil.Key("startKey", startKey), logutil.Key("endKey", endKey),
zap.Uint64("rateLimit", req.RateLimit),
zap.Uint32("concurrency", req.Concurrency))

Expand All @@ -475,7 +476,7 @@ func (bc *Client) BackupRange(
if err != nil {
return errors.Trace(err)
}
log.Info("finish backup push down", zap.Int("Ok", results.Len()))
logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len()))

// Find and backup remaining ranges.
// TODO: test fine grained backup.
Expand All @@ -490,12 +491,12 @@ func (bc *Client) BackupRange(
progressCallBack(RangeUnit)

if req.IsRawKv {
log.Info("backup raw ranges",
logutil.CL(ctx).Info("raw ranges backed up",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
zap.String("cf", req.Cf))
} else {
log.Info("backup time range",
logutil.CL(ctx).Info("time range backed up",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
}
Expand Down Expand Up @@ -590,7 +591,7 @@ func (bc *Client) fineGrainedBackup(
if len(incomplete) == 0 {
return nil
}
log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
logutil.CL(ctx).Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
// Step2, retry backup on incomplete range
respCh := make(chan *backuppb.BackupResponse, 4)
errCh := make(chan error, 4)
Expand Down Expand Up @@ -647,12 +648,12 @@ func (bc *Client) fineGrainedBackup(
break selectLoop
}
if resp.Error != nil {
log.Panic("unexpected backup error",
logutil.CL(ctx).Panic("unexpected backup error",
zap.Reflect("error", resp.Error))
}
log.Info("put fine grained range",
logutil.Key("startKey", resp.StartKey),
logutil.Key("endKey", resp.EndKey),
logutil.CL(ctx).Info("put fine grained range",
logutil.Key("fine-grained-range-start", resp.StartKey),
logutil.Key("fine-grained-range-end", resp.EndKey),
)
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

Expand Down Expand Up @@ -780,11 +781,11 @@ func (bc *Client) handleFineGrained(
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}

log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID))
return 0, errors.Annotatef(err, "failed to connect to store %d", storeID)
}
hasProgress := false
Expand All @@ -811,17 +812,17 @@ func (bc *Client) handleFineGrained(
return nil
},
func() (backuppb.BackupClient, error) {
log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}
log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)",
redact.Key(req.StartKey), redact.Key(req.EndKey))
}
Expand All @@ -839,6 +840,7 @@ func (bc *Client) handleFineGrained(
// Stop receiving response if respFn returns error.
func SendBackup(
ctx context.Context,
// the `storeID` seems only used for logging now, maybe we can remove it then?
storeID uint64,
client backuppb.BackupClient,
req backuppb.BackupRequest,
Expand All @@ -857,14 +859,11 @@ func SendBackup(
var errReset error
backupLoop:
for retry := 0; retry < backupRetryTimes; retry++ {
log.Info("try backup",
logutil.Key("startKey", req.StartKey),
logutil.Key("endKey", req.EndKey),
zap.Uint64("storeID", storeID),
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
log.Info("failpoint hint-backup-start injected, " +
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
Expand All @@ -880,13 +879,13 @@ backupLoop:
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-retryable-error injected.")
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-not-retryable-error injected.")
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
}
})
Expand All @@ -900,7 +899,7 @@ backupLoop:
}
continue
}
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}
Expand All @@ -910,9 +909,8 @@ backupLoop:
resp, err := bcli.Recv()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
logutil.CL(ctx).Info("backup streaming finish",
zap.Int("retry-time", retry))
break backupLoop
}
if isRetryableError(err) {
Expand All @@ -929,9 +927,9 @@ backupLoop:
}

// TODO: handle errors in the resp.
log.Info("range backuped",
logutil.Key("startKey", resp.GetStartKey()),
logutil.Key("endKey", resp.GetEndKey()))
logutil.CL(ctx).Info("range backed up",
logutil.Key("small-range-start-key", resp.GetStartKey()),
logutil.Key("small-range-end-key", resp.GetEndKey()))
err = respFn(resp)
if err != nil {
return errors.Trace(err)
Expand Down
34 changes: 17 additions & 17 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
Expand Down Expand Up @@ -66,30 +65,31 @@ func (push *pushDown) pushBackup(
// Push down backup tasks to all tikv instances.
res := rtree.NewRangeTree()
failpoint.Inject("noop-backup", func(_ failpoint.Value) {
log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
failpoint.Return(res, nil)
})

wg := new(sync.WaitGroup)
for _, s := range stores {
store := s
storeID := s.GetId()
lctx := logutil.ContextWithField(ctx, zap.Uint64("store-id", storeID))
if s.GetState() != metapb.StoreState_Up {
log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState()))
logutil.CL(lctx).Warn("skip store", zap.Stringer("State", s.GetState()))
continue
}
client, err := push.mgr.GetBackupClient(ctx, storeID)
client, err := push.mgr.GetBackupClient(lctx, storeID)
if err != nil {
// BR should be able to backup even some of stores disconnected.
// The regions managed by this store can be retried at fine-grained backup then.
log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err))
logutil.CL(lctx).Warn("fail to connect store, skipping", zap.Error(err))
return res, nil
}
wg.Add(1)
go func() {
defer wg.Done()
err := SendBackup(
ctx, storeID, client, req,
lctx, storeID, client, req,
func(resp *backuppb.BackupResponse) error {
// Forward all responses (including error).
push.respCh <- responseAndStore{
Expand All @@ -99,8 +99,8 @@ func (push *pushDown) pushBackup(
return nil
},
func() (backuppb.BackupClient, error) {
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(ctx, storeID)
logutil.CL(lctx).Warn("reset the connection in push")
return push.mgr.ResetBackupClient(lctx, storeID)
})
// Disconnected stores can be ignored.
if err != nil {
Expand All @@ -127,14 +127,14 @@ func (push *pushDown) pushBackup(
}
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
Expand All @@ -150,28 +150,28 @@ func (push *pushDown) pushBackup(
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
case *backuppb.Error_KvError:
log.Warn("backup occur kv error", zap.Reflect("error", v))
logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v))

case *backuppb.Error_RegionError:
log.Warn("backup occur region error", zap.Reflect("error", v))
logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v))

case *backuppb.Error_ClusterIdError:
log.Error("backup occur cluster ID error", zap.Reflect("error", v))
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
return res, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
default:
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
log.Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("File or directory not found error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid."))
}

if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure tikv has permission to read from & write to the storage."))
}
return res, berrors.ErrKVStorage
Expand All @@ -181,7 +181,7 @@ func (push *pushDown) pushBackup(
if !berrors.Is(err, berrors.ErrFailedToConnect) {
return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey))
}
log.Warn("skipping disconnected stores", logutil.ShortError(err))
logutil.CL(ctx).Warn("skipping disconnected stores", logutil.ShortError(err))
return res, nil
}
}
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ func autoRandomIncrementBits(col *table.Column, randomBits int) int {
}

// collectGeneratedColumns collects all expressions required to evaluate the
// results of all stored generated columns. The returning slice is in evaluation
// order.
// results of all generated columns. The returning slice is in evaluation order.
func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.Column) ([]genCol, error) {
maxGenColOffset := -1
hasGenCol := false
for _, col := range cols {
if col.GeneratedStored && col.Offset > maxGenColOffset {
maxGenColOffset = col.Offset
if col.GeneratedExpr != nil {
hasGenCol = true
break
}
}

if maxGenColOffset < 0 {
if !hasGenCol {
return nil, nil
}

Expand Down Expand Up @@ -165,7 +165,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C
// for simplicity we just evaluate all generated columns (virtual or not) before the last stored one.
var genCols []genCol
for i, col := range cols {
if col.GeneratedExpr != nil && col.Offset <= maxGenColOffset {
if col.GeneratedExpr != nil {
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit a648667

Please sign in to comment.