Skip to content

Commit

Permalink
Merge commit 'cc25ce9f3fbc6971e757b2c4c856827144477784' into br_keep_…
Browse files Browse the repository at this point in the history
…upstream
  • Loading branch information
3pointer committed Aug 6, 2021
2 parents b32ce9b + cc25ce9 commit ac0327c
Show file tree
Hide file tree
Showing 39 changed files with 1,121 additions and 1,478 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

include Makefile.common

.PHONY: all clean test gotest server dev benchkv benchraw check checklist parser tidy ddltest br_web
.PHONY: all clean test gotest server dev benchkv benchraw check checklist parser tidy ddltest

default: server buildsucc

Expand Down
25 changes: 0 additions & 25 deletions a

This file was deleted.

61 changes: 0 additions & 61 deletions br/go.mod1

This file was deleted.

987 changes: 0 additions & 987 deletions br/go.sum1

This file was deleted.

64 changes: 31 additions & 33 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,12 @@ func (bc *Client) BackupRanges(
// we collect all files in a single goroutine to avoid thread safety issues.
workerPool := utils.NewWorkerPool(concurrency, "Ranges")
eg, ectx := errgroup.WithContext(ctx)
for _, r := range ranges {
for id, r := range ranges {
id := id
sk, ek := r.StartKey, r.EndKey
workerPool.ApplyOnErrorGroup(eg, func() error {
err := bc.BackupRange(ectx, sk, ek, req, metaWriter, progressCallBack)
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -446,15 +448,14 @@ func (bc *Client) BackupRange(
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("backup range finished", zap.Duration("take", elapsed))
logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed))
key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey)
if err != nil {
summary.CollectFailureUnit(key, err)
}
}()
log.Info("backup started",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.CL(ctx).Info("backup started",
logutil.Key("startKey", startKey), logutil.Key("endKey", endKey),
zap.Uint64("rateLimit", req.RateLimit),
zap.Uint32("concurrency", req.Concurrency))

Expand All @@ -475,7 +476,7 @@ func (bc *Client) BackupRange(
if err != nil {
return errors.Trace(err)
}
log.Info("finish backup push down", zap.Int("Ok", results.Len()))
logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len()))

// Find and backup remaining ranges.
// TODO: test fine grained backup.
Expand All @@ -490,12 +491,12 @@ func (bc *Client) BackupRange(
progressCallBack(RangeUnit)

if req.IsRawKv {
log.Info("backup raw ranges",
logutil.CL(ctx).Info("raw ranges backed up",
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
zap.String("cf", req.Cf))
} else {
log.Info("backup time range",
logutil.CL(ctx).Info("time range backed up",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
}
Expand Down Expand Up @@ -590,7 +591,7 @@ func (bc *Client) fineGrainedBackup(
if len(incomplete) == 0 {
return nil
}
log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
logutil.CL(ctx).Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
// Step2, retry backup on incomplete range
respCh := make(chan *backuppb.BackupResponse, 4)
errCh := make(chan error, 4)
Expand Down Expand Up @@ -647,12 +648,12 @@ func (bc *Client) fineGrainedBackup(
break selectLoop
}
if resp.Error != nil {
log.Panic("unexpected backup error",
logutil.CL(ctx).Panic("unexpected backup error",
zap.Reflect("error", resp.Error))
}
log.Info("put fine grained range",
logutil.Key("startKey", resp.StartKey),
logutil.Key("endKey", resp.EndKey),
logutil.CL(ctx).Info("put fine grained range",
logutil.Key("fine-grained-range-start", resp.StartKey),
logutil.Key("fine-grained-range-end", resp.EndKey),
)
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

Expand Down Expand Up @@ -780,11 +781,11 @@ func (bc *Client) handleFineGrained(
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}

log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID))
return 0, errors.Annotatef(err, "failed to connect to store %d", storeID)
}
hasProgress := false
Expand All @@ -811,17 +812,17 @@ func (bc *Client) handleFineGrained(
return nil
},
func() (backuppb.BackupClient, error) {
log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}
log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)",
redact.Key(req.StartKey), redact.Key(req.EndKey))
}
Expand All @@ -839,6 +840,7 @@ func (bc *Client) handleFineGrained(
// Stop receiving response if respFn returns error.
func SendBackup(
ctx context.Context,
// the `storeID` seems only used for logging now, maybe we can remove it then?
storeID uint64,
client backuppb.BackupClient,
req backuppb.BackupRequest,
Expand All @@ -857,14 +859,11 @@ func SendBackup(
var errReset error
backupLoop:
for retry := 0; retry < backupRetryTimes; retry++ {
log.Info("try backup",
logutil.Key("startKey", req.StartKey),
logutil.Key("endKey", req.EndKey),
zap.Uint64("storeID", storeID),
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
log.Info("failpoint hint-backup-start injected, " +
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
Expand All @@ -880,13 +879,13 @@ backupLoop:
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-retryable-error injected.")
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-not-retryable-error injected.")
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
}
})
Expand All @@ -900,7 +899,7 @@ backupLoop:
}
continue
}
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}
Expand All @@ -910,9 +909,8 @@ backupLoop:
resp, err := bcli.Recv()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
logutil.CL(ctx).Info("backup streaming finish",
zap.Int("retry-time", retry))
break backupLoop
}
if isRetryableError(err) {
Expand All @@ -929,9 +927,9 @@ backupLoop:
}

// TODO: handle errors in the resp.
log.Info("range backuped",
logutil.Key("startKey", resp.GetStartKey()),
logutil.Key("endKey", resp.GetEndKey()))
logutil.CL(ctx).Info("range backed up",
logutil.Key("small-range-start-key", resp.GetStartKey()),
logutil.Key("small-range-end-key", resp.GetEndKey()))
err = respFn(resp)
if err != nil {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit ac0327c

Please sign in to comment.