Skip to content

Commit

Permalink
Merge branch 'master' into feature/dropping-multi-indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Aug 26, 2021
2 parents 6556377 + cdadfdb commit 6e46603
Show file tree
Hide file tree
Showing 80 changed files with 2,386 additions and 931 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

include Makefile.common

.PHONY: all clean test gotest server dev benchkv benchraw check checklist parser tidy ddltest
.PHONY: all clean test gotest server dev benchkv benchraw check checklist parser tidy ddltest build_br build_lightning build_lightning-ctl

default: server buildsucc

Expand Down
7 changes: 5 additions & 2 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,16 @@ func (push *pushDown) pushBackup(
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid."))
}

if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg),
zap.String("work around", "please ensure tikv has permission to read from & write to the storage."))
}
return res, berrors.ErrKVStorage
return res, errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s",
store.GetId(),
redact.String(store.GetAddress()),
errPb.Msg,
)
}
}
case err := <-push.errCh:
Expand Down
10 changes: 1 addition & 9 deletions br/pkg/lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"path/filepath"
"strconv"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -82,13 +81,6 @@ func (s testParquetParserSuite) TestParquetParser(c *C) {
}

func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {
// those deprecated TIME/TIMESTAMP types depend on the local timezone!
prevTZ := time.Local
time.Local = time.FixedZone("UTC+8", 8*60*60)
defer func() {
time.Local = prevTZ
}()

type Test struct {
Date int32 `parquet:"name=date, type=DATE"`
TimeMillis int32 `parquet:"name=timemillis, type=TIME_MILLIS"`
Expand All @@ -114,7 +106,7 @@ func (s testParquetParserSuite) TestParquetVariousTypes(c *C) {

v := &Test{
Date: 18564, // 2020-10-29
TimeMillis: 62775123, // 17:26:15.123 (note all time are in UTC+8!)
TimeMillis: 62775123, // 17:26:15.123
TimeMicros: 62775123456, // 17:26:15.123
TimestampMillis: 1603963672356, // 2020-10-29T09:27:52.356Z
TimestampMicros: 1603963672356956, // 2020-10-29T09:27:52.356956Z
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/logutil/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncod
for _, peer := range region.GetPeers() {
peers = append(peers, peer.String())
}
enc.AddUint64("ID", region.Id)
enc.AddUint64("ID", region.GetId())
enc.AddString("startKey", redact.Key(region.GetStartKey()))
enc.AddString("endKey", redact.Key(region.GetEndKey()))
enc.AddString("epoch", region.GetRegionEpoch().String())
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (importer *FileImporter) Import(
files []*backuppb.File,
rewriteRules *RewriteRules,
) error {
start := time.Now()
log.Debug("import file", logutil.Files(files))
// Rewrite the start key and end key of file to scan regions
var startKey, endKey []byte
Expand Down Expand Up @@ -340,6 +341,8 @@ func (importer *FileImporter) Import(
logutil.Region(info.Region),
logutil.Key("startKey", startKey),
logutil.Key("endKey", endKey),
logutil.Key("file-simple-start", files[0].StartKey),
logutil.Key("file-simple-end", files[0].EndKey),
logutil.ShortError(e))
continue regionLoop
}
Expand All @@ -352,7 +355,10 @@ func (importer *FileImporter) Import(
logutil.ShortError(errDownload))
return errors.Trace(errDownload)
}

log.Debug("download file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)),
logutil.Key("start", files[0].StartKey),
logutil.Key("end", files[0].EndKey),
)
ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info)
ingestRetry:
for errIngest == nil {
Expand Down Expand Up @@ -418,6 +424,7 @@ func (importer *FileImporter) Import(
return errors.Trace(errIngest)
}
}
log.Debug("ingest file done", zap.String("file-sample", files[0].Name), zap.Stringer("take", time.Since(start)))
for _, f := range files {
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
Expand Down Expand Up @@ -451,7 +458,7 @@ func (importer *FileImporter) downloadSST(
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound)
return nil, errors.Annotate(berrors.ErrKVRewriteRuleNotFound, "failed to find rewrite rule.")
}
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
Expand Down
149 changes: 120 additions & 29 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -73,6 +75,7 @@ type brContextManager struct {

// This 'set' of table ID allow us to handle each table just once.
hasTable map[int64]CreatedTable
mu sync.Mutex
}

func (manager *brContextManager) Close(ctx context.Context) {
Expand All @@ -85,6 +88,8 @@ func (manager *brContextManager) Close(ctx context.Context) {

func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error {
placementRuleTables := make([]*model.TableInfo, 0, len(tables))
manager.mu.Lock()
defer manager.mu.Unlock()

for _, tbl := range tables {
if _, ok := manager.hasTable[tbl.Table.ID]; !ok {
Expand All @@ -97,6 +102,8 @@ func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTabl
}

func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error {
manager.mu.Lock()
defer manager.mu.Unlock()
placementRuleTables := make([]*model.TableInfo, 0, len(tables))

for _, table := range tables {
Expand Down Expand Up @@ -182,6 +189,8 @@ type tikvSender struct {
inCh chan<- DrainResult

wg *sync.WaitGroup

tableWaiters *sync.Map
}

func (b *tikvSender) PutSink(sink TableSink) {
Expand All @@ -191,6 +200,7 @@ func (b *tikvSender) PutSink(sink TableSink) {
}

func (b *tikvSender) RestoreBatch(ranges DrainResult) {
log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh)))
b.inCh <- ranges
}

Expand All @@ -199,29 +209,52 @@ func NewTiKVSender(
ctx context.Context,
cli *Client,
updateCh glue.Progress,
splitConcurrency uint,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)

sender := &tikvSender{
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
tableWaiters: new(sync.Map),
}

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.restoreWorker(ctx, midCh)
return sender, nil
}

func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) {
func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}

type drainResultAndDone struct {
result DrainResult
done func()
}

func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
concurrency uint,
) {
defer log.Debug("split worker closed")
eg, ectx := errgroup.WithContext(ctx)
defer func() {
b.wg.Done()
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
close(next)
}()
pool := utils.NewWorkerPool(concurrency, "split")
for {
select {
case <-ctx.Done():
Expand All @@ -230,44 +263,102 @@ func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult,
if !ok {
return
}
if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
b.sink.EmitError(err)
return
}
next <- result
// When the batcher has sent all ranges from a table, it would
// mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum.
//
// When there a sole worker sequentially running those batch tasks, everything is fine, however,
// in the context of multi-workers, that become buggy, for example:
// |------table 1, ranges 1------|------table 1, ranges 2------|
// The batcher send batches: [
// {Ranges: ranges 1},
// {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// ]
// And there are two workers runs concurrently:
// worker 1: {Ranges: ranges 1}
// worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1}
// And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
}
next <- drainResultAndDone{
result: result,
done: done,
}
return nil
})
}
}
}

// registerTableIsRestoring marks some tables as 'current restoring'.
// Returning a function that mark the restore has been done.
func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() {
wgs := make([]*sync.WaitGroup, 0, len(ts))
for _, t := range ts {
i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup))
wg := i.(*sync.WaitGroup)
wg.Add(1)
wgs = append(wgs, wg)
}
return func() {
for _, wg := range wgs {
wg.Done()
}
}
}

// waitTablesDone block the current goroutine,
// till all tables provided are no more ‘current restoring’.
func (b *tikvSender) waitTablesDone(ts []CreatedTable) {
for _, t := range ts {
wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID)
if !ok {
log.Panic("bug! table done before register!",
zap.Any("wait-table-map", b.tableWaiters),
zap.Stringer("table", t.Table.Name))
}
wg.(*sync.WaitGroup).Wait()
}
}

func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) {
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Debug("restore worker closed")
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
b.wg.Done()
b.sink.Close()
}()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ranges:
case r, ok := <-ranges:
if !ok {
return
}
files := result.Files()
if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil {
b.sink.EmitError(err)
return
}

log.Info("restore batch done", rtree.ZapRanges(result.Ranges))
b.sink.EmitTables(result.BlankTablesAfterSend...)
files := r.result.Files()
// There has been a worker in the `RestoreFiles` procedure.
// Spawning a raw goroutine won't make too many requests to TiKV.
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
return e
}
log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges))
r.done()
b.waitTablesDone(r.result.BlankTablesAfterSend)
b.sink.EmitTables(r.result.BlankTablesAfterSend...)
return nil
})
}
}
}

func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}
2 changes: 2 additions & 0 deletions br/pkg/restore/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range
"rewrite start key",
logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule))
}
oldKey := rg.EndKey
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
logutil.Key("origin-key", oldKey),
logutil.Key("key", rg.EndKey),
logutil.RewriteRule(rule))
}
Expand Down
17 changes: 5 additions & 12 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,19 @@ func (rs *RegionSplitter) Split(
maxKey = rule.GetNewKeyPrefix()
}
}
for _, rule := range rewriteRules.Data {
if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 {
minKey = rule.GetNewKeyPrefix()
}
if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 {
maxKey = rule.GetNewKeyPrefix()
}
}
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := PaginateScanRegion(ctx, rs.client, minKey, maxKey, ScanRegionPaginationLimit)
if errScan != nil {
if berrors.ErrPDBatchScanRegion.Equal(errScan) {
log.Warn("inconsistent region info get.", logutil.ShortError(errScan))
time.Sleep(time.Second)
continue SplitRegions
}
return errors.Trace(errScan)
}
if len(regions) == 0 {
log.Warn("split regions cannot scan any region")
return nil
}
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
Expand Down
Loading

0 comments on commit 6e46603

Please sign in to comment.