diff --git a/cmd/br/debug.go b/cmd/br/debug.go index a397ee3223e41..f010adbd892f7 100644 --- a/cmd/br/debug.go +++ b/cmd/br/debug.go @@ -11,8 +11,6 @@ import ( "path" "reflect" - "github.com/pingcap/br/pkg/metautil" - "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/backup" @@ -24,6 +22,7 @@ import ( berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/logutil" + "github.com/pingcap/br/pkg/metautil" "github.com/pingcap/br/pkg/mock/mockid" "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/rtree" diff --git a/go.mod1 b/go.mod1 index 6e9fe5b89e770..5f767c2a8944f 100644 --- a/go.mod1 +++ b/go.mod1 @@ -28,8 +28,8 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 - github.com/pingcap/parser v0.0.0-20210707071004-31c87e37af5c - github.com/pingcap/tidb v0.0.0-20210727021217-150fe0c37755 + github.com/pingcap/parser v0.0.0-20210805052952-eda5e763a66e + github.com/pingcap/tidb v1.1.0-beta.0.20210805015907-39aae6a455d5 github.com/pingcap/tidb-tools v5.0.3+incompatible github.com/pingcap/tipb v0.0.0-20210708040514-0f154bb0dc0f github.com/prometheus/client_golang v1.5.1 @@ -37,7 +37,7 @@ require ( github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 - github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210726054004-ce977e34b0dd + github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210727120905-55155ad2e543 github.com/tikv/pd v1.1.0-beta.0.20210323121136-78679e5e209d github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 diff --git a/go.sum1 b/go.sum1 index fc49e8393ab3f..963860353a915 100644 --- a/go.sum1 +++ b/go.sum1 @@ -457,7 +457,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210527074428-73468940541b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210712050333-b66fdbd6bfd5/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I= github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -466,13 +465,14 @@ github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 h1:k2BbABz9+TNpYRwsCCFS8pEEnFVOdbgEjL/kTlLuzZQ= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= -github.com/pingcap/parser v0.0.0-20210707071004-31c87e37af5c h1:FPBMwDTtMW25zyLLBrIMUnM7Zl/WtLxR9LSEiaL5hII= -github.com/pingcap/parser v0.0.0-20210707071004-31c87e37af5c/go.mod h1:Ek0mLKEqUGnQqBw1JnYrJQxsguU433DU68yUbsoeJ7s= +github.com/pingcap/parser v0.0.0-20210802034743-dd9b189324ce/go.mod h1:Ek0mLKEqUGnQqBw1JnYrJQxsguU433DU68yUbsoeJ7s= +github.com/pingcap/parser v0.0.0-20210805052952-eda5e763a66e h1:EymNdGyPn7q/iR4gsa6QFvRbTsrk4p5vGEtJ3JCWE1Q= +github.com/pingcap/parser v0.0.0-20210805052952-eda5e763a66e/go.mod h1:Ek0mLKEqUGnQqBw1JnYrJQxsguU433DU68yUbsoeJ7s= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= -github.com/pingcap/tidb v0.0.0-20210727021217-150fe0c37755 h1:1TxBadczCQGoOtmKh0VPRiT60tTH0qFLFaNRJO45KIk= -github.com/pingcap/tidb v0.0.0-20210727021217-150fe0c37755/go.mod h1:HdSBIf/qkvB7PSVEzr3vO++xrTKO5E8WjWMr3nTMLtY= +github.com/pingcap/tidb v1.1.0-beta.0.20210805015907-39aae6a455d5 h1:mADjBffSIHXec5mB+rUS+pfGzA/JRUSnkgYc7v0fSwc= +github.com/pingcap/tidb v1.1.0-beta.0.20210805015907-39aae6a455d5/go.mod h1:8n0VPQrelzJ09VYFg1eyvh7kGn3CmoWomwKXvDxDd2Y= github.com/pingcap/tidb-dashboard v0.0.0-20210512074702-4ee3e3909d5e/go.mod h1:7HnQAeqKOuJwCBUeglCUel7SjW6fPNnoXawuv+6Q6Ek= github.com/pingcap/tidb-tools v5.0.3+incompatible h1:vYMrW9ux+3HRMeRZ1fUOjy2nyiodtuVyAyK270EKBEs= github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= @@ -586,9 +586,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210723073051-3d747cd203ba/go.mod h1:+bOiuuZZUqIq19EqyhTWQFaB0PeXLOh/il1vnVZx3Tk= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210726054004-ce977e34b0dd h1:Wi2kmhetf3CmflXquPSNNxTaFbpHg0nM4X36BqEG2y0= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210726054004-ce977e34b0dd/go.mod h1:+bOiuuZZUqIq19EqyhTWQFaB0PeXLOh/il1vnVZx3Tk= +github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210727120905-55155ad2e543 h1:Yi/Xn7NdbxicB/4Ve3myyqtEkqiVaAjWYUDTTETBdFg= +github.com/tikv/client-go/v2 v2.0.0-alpha.0.20210727120905-55155ad2e543/go.mod h1:LP8O6zZXAWKU781u1nt/v9nC0hxPipPxOxSoZT9Goqg= github.com/tikv/pd v1.1.0-beta.0.20210609101029-3ba158cf41a4 h1:QljEYXHc2krYg6f1zol6f7Ut9QBDtK1UVm9UZOT0YFE= github.com/tikv/pd v1.1.0-beta.0.20210609101029-3ba158cf41a4/go.mod h1:4AiyUYyIG4cA7P+xDU/Mep0yo4Hb+2IfFstiOSKFbJ4= github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index bf595fa5ce896..c34abc6b96e9a 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -423,10 +423,12 @@ func (bc *Client) BackupRanges( // we collect all files in a single goroutine to avoid thread safety issues. workerPool := utils.NewWorkerPool(concurrency, "Ranges") eg, ectx := errgroup.WithContext(ctx) - for _, r := range ranges { + for id, r := range ranges { + id := id sk, ek := r.StartKey, r.EndKey workerPool.ApplyOnErrorGroup(eg, func() error { - err := bc.BackupRange(ectx, sk, ek, req, metaWriter, progressCallBack) + elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) + err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } @@ -448,15 +450,14 @@ func (bc *Client) BackupRange( start := time.Now() defer func() { elapsed := time.Since(start) - log.Info("backup range finished", zap.Duration("take", elapsed)) + logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed)) key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey) if err != nil { summary.CollectFailureUnit(key, err) } }() - log.Info("backup started", - logutil.Key("startKey", startKey), - logutil.Key("endKey", endKey), + logutil.CL(ctx).Info("backup started", + logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.Uint64("rateLimit", req.RateLimit), zap.Uint32("concurrency", req.Concurrency)) @@ -477,7 +478,7 @@ func (bc *Client) BackupRange( if err != nil { return errors.Trace(err) } - log.Info("finish backup push down", zap.Int("Ok", results.Len())) + logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len())) // Find and backup remaining ranges. // TODO: test fine grained backup. @@ -492,12 +493,12 @@ func (bc *Client) BackupRange( progressCallBack(RangeUnit) if req.IsRawKv { - log.Info("backup raw ranges", + logutil.CL(ctx).Info("raw ranges backed up", logutil.Key("startKey", startKey), logutil.Key("endKey", endKey), zap.String("cf", req.Cf)) } else { - log.Info("backup time range", + logutil.CL(ctx).Info("time range backed up", zap.Reflect("StartVersion", req.StartVersion), zap.Reflect("EndVersion", req.EndVersion)) } @@ -592,7 +593,7 @@ func (bc *Client) fineGrainedBackup( if len(incomplete) == 0 { return nil } - log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete))) + logutil.CL(ctx).Info("start fine grained backup", zap.Int("incomplete", len(incomplete))) // Step2, retry backup on incomplete range respCh := make(chan *backuppb.BackupResponse, 4) errCh := make(chan error, 4) @@ -649,12 +650,12 @@ func (bc *Client) fineGrainedBackup( break selectLoop } if resp.Error != nil { - log.Panic("unexpected backup error", + logutil.CL(ctx).Panic("unexpected backup error", zap.Reflect("error", resp.Error)) } - log.Info("put fine grained range", - logutil.Key("startKey", resp.StartKey), - logutil.Key("endKey", resp.EndKey), + logutil.CL(ctx).Info("put fine grained range", + logutil.Key("fine-grained-range-start", resp.StartKey), + logutil.Key("fine-grained-range-end", resp.EndKey), ) rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) @@ -782,11 +783,11 @@ func (bc *Client) handleFineGrained( if berrors.Is(err, berrors.ErrFailedToConnect) { // When the leader store is died, // 20s for the default max duration before the raft election timer fires. - log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) + logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) return 20000, nil } - log.Error("fail to connect store", zap.Uint64("StoreID", storeID)) + logutil.CL(ctx).Error("fail to connect store", zap.Uint64("StoreID", storeID)) return 0, errors.Annotatef(err, "failed to connect to store %d", storeID) } hasProgress := false @@ -813,17 +814,17 @@ func (bc *Client) handleFineGrained( return nil }, func() (backuppb.BackupClient, error) { - log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID)) + logutil.CL(ctx).Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID)) return bc.mgr.ResetBackupClient(ctx, storeID) }) if err != nil { if berrors.Is(err, berrors.ErrFailedToConnect) { // When the leader store is died, // 20s for the default max duration before the raft election timer fires. - log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) + logutil.CL(ctx).Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) return 20000, nil } - log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err)) + logutil.CL(ctx).Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err)) return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) } @@ -841,6 +842,7 @@ func (bc *Client) handleFineGrained( // Stop receiving response if respFn returns error. func SendBackup( ctx context.Context, + // the `storeID` seems only used for logging now, maybe we can remove it then? storeID uint64, client backuppb.BackupClient, req backuppb.BackupRequest, @@ -859,14 +861,11 @@ func SendBackup( var errReset error backupLoop: for retry := 0; retry < backupRetryTimes; retry++ { - log.Info("try backup", - logutil.Key("startKey", req.StartKey), - logutil.Key("endKey", req.EndKey), - zap.Uint64("storeID", storeID), + logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry), ) failpoint.Inject("hint-backup-start", func(v failpoint.Value) { - log.Info("failpoint hint-backup-start injected, " + + logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + "process will notify the shell.") if sigFile, ok := v.(string); ok { file, err := os.Create(sigFile) @@ -882,13 +881,13 @@ backupLoop: bcli, err := client.Backup(ctx, &req) failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { if val.(bool) { - log.Debug("failpoint reset-retryable-error injected.") + logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.") err = status.Error(codes.Unavailable, "Unavailable error") } }) failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { if val.(bool) { - log.Debug("failpoint reset-not-retryable-error injected.") + logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") } }) @@ -902,7 +901,7 @@ backupLoop: } continue } - log.Error("fail to backup", zap.Uint64("StoreID", storeID), + logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry time", retry)) return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID) } @@ -912,9 +911,8 @@ backupLoop: resp, err := bcli.Recv() if err != nil { if errors.Cause(err) == io.EOF { // nolint:errorlint - log.Info("backup streaming finish", - zap.Uint64("StoreID", storeID), - zap.Int("retry time", retry)) + logutil.CL(ctx).Info("backup streaming finish", + zap.Int("retry-time", retry)) break backupLoop } if isRetryableError(err) { @@ -931,9 +929,9 @@ backupLoop: } // TODO: handle errors in the resp. - log.Info("range backuped", - logutil.Key("startKey", resp.GetStartKey()), - logutil.Key("endKey", resp.GetEndKey())) + logutil.CL(ctx).Info("range backed up", + logutil.Key("small-range-start-key", resp.GetStartKey()), + logutil.Key("small-range-end-key", resp.GetEndKey())) err = respFn(resp) if err != nil { return errors.Trace(err) diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 8d933d6293032..d1adf3cb2ce1e 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -12,7 +12,6 @@ import ( "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "go.uber.org/zap" berrors "github.com/pingcap/br/pkg/errors" @@ -67,7 +66,7 @@ func (push *pushDown) pushBackup( // Push down backup tasks to all tikv instances. res := rtree.NewRangeTree() failpoint.Inject("noop-backup", func(_ failpoint.Value) { - log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) + logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) failpoint.Return(res, nil) }) @@ -75,22 +74,23 @@ func (push *pushDown) pushBackup( for _, s := range stores { store := s storeID := s.GetId() + lctx := logutil.ContextWithField(ctx, zap.Uint64("store-id", storeID)) if s.GetState() != metapb.StoreState_Up { - log.Warn("skip store", zap.Uint64("StoreID", storeID), zap.Stringer("State", s.GetState())) + logutil.CL(lctx).Warn("skip store", zap.Stringer("State", s.GetState())) continue } - client, err := push.mgr.GetBackupClient(ctx, storeID) + client, err := push.mgr.GetBackupClient(lctx, storeID) if err != nil { // BR should be able to backup even some of stores disconnected. // The regions managed by this store can be retried at fine-grained backup then. - log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err)) + logutil.CL(lctx).Warn("fail to connect store, skipping", zap.Error(err)) return res, nil } wg.Add(1) go func() { defer wg.Done() err := SendBackup( - ctx, storeID, client, req, + lctx, storeID, client, req, func(resp *backuppb.BackupResponse) error { // Forward all responses (including error). push.respCh <- responseAndStore{ @@ -100,8 +100,8 @@ func (push *pushDown) pushBackup( return nil }, func() (backuppb.BackupClient, error) { - log.Warn("reset the connection in push", zap.Uint64("storeID", storeID)) - return push.mgr.ResetBackupClient(ctx, storeID) + logutil.CL(lctx).Warn("reset the connection in push") + return push.mgr.ResetBackupClient(lctx, storeID) }) // Disconnected stores can be ignored. if err != nil { @@ -128,14 +128,14 @@ func (push *pushDown) pushBackup( } failpoint.Inject("backup-storage-error", func(val failpoint.Value) { msg := val.(string) - log.Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) + logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg)) resp.Error = &backuppb.Error{ Msg: msg, } }) failpoint.Inject("tikv-rw-error", func(val failpoint.Value) { msg := val.(string) - log.Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) + logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg)) resp.Error = &backuppb.Error{ Msg: msg, } @@ -151,28 +151,28 @@ func (push *pushDown) pushBackup( errPb := resp.GetError() switch v := errPb.Detail.(type) { case *backuppb.Error_KvError: - log.Warn("backup occur kv error", zap.Reflect("error", v)) + logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v)) case *backuppb.Error_RegionError: - log.Warn("backup occur region error", zap.Reflect("error", v)) + logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v)) case *backuppb.Error_ClusterIdError: - log.Error("backup occur cluster ID error", zap.Reflect("error", v)) + logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v)) return res, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb) default: if utils.MessageIsRetryableStorageError(errPb.GetMsg()) { - log.Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) + logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) continue } if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) { errMsg := fmt.Sprintf("File or directory not found error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress())) - log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), + logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), zap.String("work around", "please ensure br and tikv node share a same disk and the user of br and tikv has same uid.")) } if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) { errMsg := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s)", store.GetId(), redact.String(store.GetAddress())) - log.Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), + logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg), zap.String("work around", "please ensure tikv has permission to read from & write to the storage.")) } return res, berrors.ErrKVStorage @@ -182,7 +182,7 @@ func (push *pushDown) pushBackup( if !berrors.Is(err, berrors.ErrFailedToConnect) { return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) } - log.Warn("skipping disconnected stores", logutil.ShortError(err)) + logutil.CL(ctx).Warn("skipping disconnected stores", logutil.ShortError(err)) return res, nil } } diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index a54bd25359b59..f5db98aeb619e 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/ranger" "github.com/tikv/client-go/v2/oracle" - pd "github.com/tikv/pd/client" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -76,6 +75,7 @@ import ( "github.com/pingcap/br/pkg/lightning/worker" "github.com/pingcap/br/pkg/logutil" "github.com/pingcap/br/pkg/membuf" + "github.com/pingcap/br/pkg/pdutil" split "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/utils" "github.com/pingcap/br/pkg/version" @@ -800,7 +800,7 @@ func (e *File) loadEngineMeta() error { type local struct { engines sync.Map // sync version of map[uuid.UUID]*File - pdCli pd.Client + pdCtl *pdutil.PdController conns common.GRPCConns splitCli split.SplitClient tls *common.TLS @@ -907,11 +907,11 @@ func NewLocalBackend( localFile := cfg.SortedKVDir rangeConcurrency := cfg.RangeConcurrency - pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tls.ToPDSecurityOption()) + pdCtl, err := pdutil.NewPdController(ctx, pdAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) if err != nil { return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed") } - splitCli := split.NewSplitClient(pdCli, tls.TLSConfig()) + splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig()) shouldCreate := true if enableCheckpoint { @@ -947,7 +947,7 @@ func NewLocalBackend( local := &local{ engines: sync.Map{}, - pdCli: pdCli, + pdCtl: pdCtl, splitCli: splitCli, tls: tls, pdAddr: pdAddr, @@ -970,15 +970,15 @@ func NewLocalBackend( duplicateDB: duplicateDB, } local.conns = common.NewGRPCConns() - if err = local.checkMultiIngestSupport(ctx, pdCli); err != nil { + if err = local.checkMultiIngestSupport(ctx, pdCtl); err != nil { return backend.MakeBackend(nil), err } return backend.MakeBackend(local), nil } -func (local *local) checkMultiIngestSupport(ctx context.Context, pdClient pd.Client) error { - stores, err := conn.GetAllTiKVStores(ctx, pdClient, conn.SkipTiFlash) +func (local *local) checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController) error { + stores, err := conn.GetAllTiKVStores(ctx, pdCtl.GetPDClient(), conn.SkipTiFlash) if err != nil { return errors.Trace(err) } @@ -1279,7 +1279,7 @@ func (local *local) allocateTSIfNotExists(ctx context.Context, engine *File) err if engine.TS > 0 { return nil } - physical, logical, err := local.pdCli.GetTS(ctx) + physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx) if err != nil { return err } @@ -1366,6 +1366,28 @@ func (local *local) WriteToTiKV( region *split.RegionInfo, start, end []byte, ) ([]*sst.SSTMeta, Range, rangeStats, error) { + for _, peer := range region.Region.GetPeers() { + var e error + for i := 0; i < maxRetryTimes; i++ { + store, err := local.pdCtl.GetStoreInfo(ctx, peer.StoreId) + if err != nil { + e = err + continue + } + if store.Status.Capacity > 0 { + // The available disk percent of TiKV + ratio := store.Status.Available * 100 / store.Status.Capacity + if ratio < 10 { + return nil, Range{}, rangeStats{}, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", + store.Store.Address, store.Status.Available, store.Status.Capacity) + } + } + break + } + if e != nil { + log.L().Error("failed to get StoreInfo from pd http api", zap.Error(e)) + } + } begin := time.Now() regionRange := intersectRange(region.Region, Range{start: start, end: end}) opt := &pebble.IterOptions{LowerBound: regionRange.start, UpperBound: regionRange.end} @@ -1878,13 +1900,18 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File var allErrLock sync.Mutex var allErr error var wg sync.WaitGroup - - wg.Add(len(ranges)) + metErr := atomic.NewBool(false) for _, r := range ranges { startKey := r.start endKey := r.end w := local.rangeConcurrency.Apply() + // if meet error here, skip try more here to allow fail fast. + if metErr.Load() { + local.rangeConcurrency.Recycle(w) + break + } + wg.Add(1) go func(w *worker.Worker) { defer func() { local.rangeConcurrency.Recycle(w) @@ -1911,6 +1938,9 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File allErrLock.Lock() allErr = multierr.Append(allErr, err) allErrLock.Unlock() + if err != nil { + metErr.Store(true) + } }(w) } @@ -2014,7 +2044,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab return nil } log.L().Info("Begin collect duplicate local keys", zap.String("table", tbl.Meta().Name.String())) - physicalTS, logicalTS, err := local.pdCli.GetTS(ctx) + physicalTS, logicalTS, err := local.pdCtl.GetPDClient().GetTS(ctx) if err != nil { return err } @@ -2033,7 +2063,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table) error { log.L().Info("Begin collect remote duplicate keys", zap.String("table", tbl.Meta().Name.String())) - physicalTS, logicalTS, err := local.pdCli.GetTS(ctx) + physicalTS, logicalTS, err := local.pdCtl.GetPDClient().GetTS(ctx) if err != nil { return err } @@ -2152,43 +2182,31 @@ func sortAndMergeRanges(ranges []Range) []Range { } func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { - if len(finishedRanges) == 0 { + if len(ranges) == 0 || len(finishedRanges) == 0 { return ranges } - result := make([]Range, 0, len(ranges)) - rIdx := 0 - fIdx := 0 - for rIdx < len(ranges) && fIdx < len(finishedRanges) { - if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].start) <= 0 { - result = append(result, ranges[rIdx]) - rIdx++ - } else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].end) >= 0 { - fIdx++ - } else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].start) < 0 { - result = append(result, Range{start: ranges[rIdx].start, end: finishedRanges[fIdx].start}) - switch bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) { - case -1: - rIdx++ - case 0: - rIdx++ - fIdx++ - case 1: - ranges[rIdx].start = finishedRanges[fIdx].end - fIdx++ + result := make([]Range, 0) + for _, r := range ranges { + start := r.start + end := r.end + for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 { + fr := finishedRanges[0] + if bytes.Compare(fr.start, start) > 0 { + result = append(result, Range{start: start, end: fr.start}) } - } else if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) > 0 { - ranges[rIdx].start = finishedRanges[fIdx].end - fIdx++ - } else { - rIdx++ + if bytes.Compare(fr.end, start) > 0 { + start = fr.end + } + if bytes.Compare(fr.end, end) > 0 { + break + } + finishedRanges = finishedRanges[1:] + } + if bytes.Compare(start, end) < 0 { + result = append(result, Range{start: start, end: end}) } } - - if rIdx < len(ranges) { - result = append(result, ranges[rIdx:]...) - } - return result } diff --git a/pkg/lightning/config/config.go b/pkg/lightning/config/config.go index 1df2fb75fdfaa..89685a9dd5592 100644 --- a/pkg/lightning/config/config.go +++ b/pkg/lightning/config/config.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net" "net/url" "os" @@ -34,6 +35,7 @@ import ( filter "github.com/pingcap/tidb-tools/pkg/table-filter" router "github.com/pingcap/tidb-tools/pkg/table-router" tidbcfg "github.com/pingcap/tidb/config" + "github.com/tikv/pd/server/api" "go.uber.org/zap" "github.com/pingcap/br/pkg/lightning/common" @@ -67,9 +69,12 @@ const ( ErrorOnDup = "error" defaultDistSQLScanConcurrency = 15 + distSQLScanConcurrencyPerStore = 4 defaultBuildStatsConcurrency = 20 defaultIndexSerialScanConcurrency = 20 defaultChecksumTableConcurrency = 2 + defaultTableConcurrency = 6 + defaultIndexConcurrency = 2 // defaultMetaSchemaName is the default database name used to store lightning metadata defaultMetaSchemaName = "lightning_metadata" @@ -84,6 +89,10 @@ const ( autoDiskQuotaLocalReservedSpeed uint64 = 1 * units.KiB defaultEngineMemCacheSize = 512 * units.MiB defaultLocalWriterMemCacheSize = 128 * units.MiB + + maxRetryTimes = 4 + defaultRetryBackoffTime = 100 * time.Millisecond + pdStores = "/pd/api/v1/stores" ) var ( @@ -428,6 +437,7 @@ func NewConfig() *Config { MaxKVPairs: 4096, SendKVPairs: 32768, RegionSplitSize: SplitRegionSize, + DiskQuota: ByteSize(math.MaxInt64), }, PostRestore: PostRestore{ Checksum: OpLevelRequired, @@ -586,7 +596,7 @@ func (cfg *Config) Adjust(ctx context.Context) error { if cfg.App.RegionConcurrency > cpuCount { cfg.App.RegionConcurrency = cpuCount } - cfg.DefaultVarsForImporterAndLocalBackend() + cfg.DefaultVarsForImporterAndLocalBackend(ctx) default: return errors.Errorf("invalid config: unsupported `tikv-importer.backend` (%s)", cfg.TikvImporter.Backend) } @@ -672,45 +682,58 @@ func (cfg *Config) CheckAndAdjustForLocalBackend() error { return errors.Annotate(err, "invalid tikv-importer.sorted-kv-dir") } - // we need to calculate quota if disk-quota == 0 - if cfg.TikvImporter.DiskQuota == 0 { - enginesCount := uint64(cfg.App.IndexConcurrency + cfg.App.TableConcurrency) - writeAmount := uint64(cfg.App.RegionConcurrency) * uint64(cfg.Cron.CheckDiskQuota.Milliseconds()) - reservedSize := enginesCount*uint64(cfg.TikvImporter.EngineMemCacheSize) + writeAmount*autoDiskQuotaLocalReservedSpeed - - storageSize, err := common.GetStorageSize(storageSizeDir) - if err != nil { - return errors.Trace(err) - } - if storageSize.Available <= reservedSize { - return errors.Errorf( - "insufficient disk free space on `%s` (only %s, expecting >%s), please use a storage with enough free space, or specify `tikv-importer.disk-quota`", - cfg.TikvImporter.SortedKVDir, - units.BytesSize(float64(storageSize.Available)), - units.BytesSize(float64(reservedSize))) - } - cfg.TikvImporter.DiskQuota = ByteSize(storageSize.Available - reservedSize) - } - return nil } func (cfg *Config) DefaultVarsForTiDBBackend() { + if cfg.App.TableConcurrency == 0 { + cfg.App.TableConcurrency = cfg.App.RegionConcurrency + } if cfg.App.IndexConcurrency == 0 { cfg.App.IndexConcurrency = cfg.App.RegionConcurrency } - if cfg.App.TableConcurrency == 0 { - cfg.App.TableConcurrency = cfg.App.RegionConcurrency +} + +func (cfg *Config) adjustDistSQLConcurrency(ctx context.Context) error { + tls, err := cfg.ToTLS() + if err != nil { + return err + } + result := &api.StoresInfo{} + err = tls.WithHost(cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result) + if err != nil { + return errors.Trace(err) } + cfg.TiDB.DistSQLScanConcurrency = len(result.Stores) * distSQLScanConcurrencyPerStore + if cfg.TiDB.DistSQLScanConcurrency < defaultDistSQLScanConcurrency { + cfg.TiDB.DistSQLScanConcurrency = defaultDistSQLScanConcurrency + } + log.L().Info("adjust scan concurrency success", zap.Int("DistSQLScanConcurrency", cfg.TiDB.DistSQLScanConcurrency)) + return nil } -func (cfg *Config) DefaultVarsForImporterAndLocalBackend() { +func (cfg *Config) DefaultVarsForImporterAndLocalBackend(ctx context.Context) { + if cfg.TiDB.DistSQLScanConcurrency == defaultDistSQLScanConcurrency { + var e error + for i := 0; i < maxRetryTimes; i++ { + e = cfg.adjustDistSQLConcurrency(ctx) + if e == nil { + break + } + time.Sleep(defaultRetryBackoffTime) + } + if e != nil { + log.L().Error("failed to adjust scan concurrency", zap.Error(e)) + } + } + if cfg.App.IndexConcurrency == 0 { - cfg.App.IndexConcurrency = 2 + cfg.App.IndexConcurrency = defaultIndexConcurrency } if cfg.App.TableConcurrency == 0 { - cfg.App.TableConcurrency = 6 + cfg.App.TableConcurrency = defaultTableConcurrency } + if len(cfg.App.MetaSchemaName) == 0 { cfg.App.MetaSchemaName = defaultMetaSchemaName } @@ -720,9 +743,6 @@ func (cfg *Config) DefaultVarsForImporterAndLocalBackend() { if cfg.TikvImporter.RegionSplitSize == 0 { cfg.TikvImporter.RegionSplitSize = SplitRegionSize } - if cfg.TiDB.DistSQLScanConcurrency == 0 { - cfg.TiDB.DistSQLScanConcurrency = defaultDistSQLScanConcurrency - } if cfg.TiDB.BuildStatsConcurrency == 0 { cfg.TiDB.BuildStatsConcurrency = defaultBuildStatsConcurrency } @@ -840,11 +860,6 @@ func (cfg *Config) AdjustCheckPoint() { } func (cfg *Config) AdjustMydumper() { - if cfg.Mydumper.BatchSize <= 0 { - // if rows in source files are not sorted by primary key(if primary is number or cluster index enabled), - // the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it. - cfg.Mydumper.BatchSize = defaultBatchSize - } if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { cfg.Mydumper.BatchImportRatio = 0.75 } diff --git a/pkg/lightning/config/config_test.go b/pkg/lightning/config/config_test.go index b35fd002f43d9..fd000874f693c 100644 --- a/pkg/lightning/config/config_test.go +++ b/pkg/lightning/config/config_test.go @@ -82,6 +82,7 @@ func (s *configTestSuite) TestAdjustPdAddrAndPort(c *C) { cfg.Mydumper.SourceDir = "." cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) @@ -101,6 +102,7 @@ func (s *configTestSuite) TestAdjustPdAddrAndPortViaAdvertiseAddr(c *C) { cfg.Mydumper.SourceDir = "." cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) @@ -117,6 +119,7 @@ func (s *configTestSuite) TestAdjustPageNotFound(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*") @@ -130,6 +133,7 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 ts.Close() // immediately close to ensure connection refused. @@ -139,6 +143,7 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) { func (s *configTestSuite) TestAdjustBackendNotSet(c *C) { cfg := config.NewConfig() + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "tikv-importer.backend must not be empty!") } @@ -146,6 +151,7 @@ func (s *configTestSuite) TestAdjustBackendNotSet(c *C) { func (s *configTestSuite) TestAdjustInvalidBackend(c *C) { cfg := config.NewConfig() cfg.TikvImporter.Backend = "no_such_backend" + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend` \\(no_such_backend\\)") } @@ -160,6 +166,7 @@ func (s *configTestSuite) TestAdjustFileRoutePath(c *C) { invalidPath := filepath.Join(tmpDir, "../test123/1.sql") rule := &config.FileRouteRule{Path: invalidPath, Type: "sql", Schema: "test", Table: "tbl"} cfg.Mydumper.FileRouters = []*config.FileRouteRule{rule} + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(ctx) c.Assert(err, ErrorMatches, fmt.Sprintf("\\Qfile route path '%s' is not in source dir '%s'\\E", invalidPath, tmpDir)) @@ -179,6 +186,7 @@ func (s *configTestSuite) TestDecodeError(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*") @@ -193,6 +201,7 @@ func (s *configTestSuite) TestInvalidSetting(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "invalid `tidb.port` setting") @@ -207,6 +216,7 @@ func (s *configTestSuite) TestInvalidPDAddr(c *C) { cfg.TiDB.StatusPort = port cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, ErrorMatches, "invalid `tidb.pd-addr` setting") @@ -215,6 +225,7 @@ func (s *configTestSuite) TestInvalidPDAddr(c *C) { func (s *configTestSuite) TestAdjustWillNotContactServerIfEverythingIsDefined(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) @@ -226,6 +237,7 @@ func (s *configTestSuite) TestAdjustWillBatchImportRatioInvalid(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) cfg.Mydumper.BatchImportRatio = -1 + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(cfg.Mydumper.BatchImportRatio, Equals, 0.75) @@ -302,6 +314,7 @@ func (s *configTestSuite) TestAdjustSecuritySection(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.LoadFromTOML([]byte(tc.input)) c.Assert(err, IsNil, comment) @@ -443,6 +456,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { cfg.TiDB.PdAddr = "test.invalid:2379" cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.SortedKVDir = "." + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.LoadFromTOML([]byte(tc.input)) c.Assert(err, IsNil) @@ -546,6 +560,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) { taskCfg.Checkpoint.DSN = "" taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL + taskCfg.TiDB.DistSQLScanConcurrency = 1 err = taskCfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:12345@tcp(172.16.30.11:4001)/?charset=utf8mb4&sql_mode='"+mysql.DefaultSQLMode+"'&maxAllowedPacket=67108864&tls=false") @@ -558,6 +573,7 @@ func (s *configTestSuite) TestDefaultImporterBackendValue(c *C) { cfg := config.NewConfig() assignMinimalLegalValue(cfg) cfg.TikvImporter.Backend = "importer" + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(cfg.App.IndexConcurrency, Equals, 2) @@ -569,9 +585,9 @@ func (s *configTestSuite) TestDefaultTidbBackendValue(c *C) { assignMinimalLegalValue(cfg) cfg.TikvImporter.Backend = "tidb" cfg.App.RegionConcurrency = 123 + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) - c.Assert(cfg.App.IndexConcurrency, Equals, 123) c.Assert(cfg.App.TableConcurrency, Equals, 123) } @@ -581,6 +597,7 @@ func (s *configTestSuite) TestDefaultCouldBeOverwritten(c *C) { cfg.TikvImporter.Backend = "importer" cfg.App.IndexConcurrency = 20 cfg.App.TableConcurrency = 60 + cfg.TiDB.DistSQLScanConcurrency = 1 err := cfg.Adjust(context.Background()) c.Assert(err, IsNil) c.Assert(cfg.App.IndexConcurrency, Equals, 20) @@ -668,6 +685,7 @@ func (s *configTestSuite) TestAdjustWithLegacyBlackWhiteList(c *C) { ctx := context.Background() cfg.Mydumper.Filter = []string{"test.*"} + cfg.TiDB.DistSQLScanConcurrency = 1 c.Assert(cfg.Adjust(ctx), IsNil) c.Assert(cfg.HasLegacyBlackWhiteList(), IsFalse) @@ -688,7 +706,7 @@ func (s *configTestSuite) TestAdjustDiskQuota(c *C) { cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.DiskQuota = 0 cfg.TikvImporter.SortedKVDir = base + cfg.TiDB.DistSQLScanConcurrency = 1 c.Assert(cfg.Adjust(ctx), IsNil) - // DiskQuota must greater than 0 after adjust - c.Assert(int64(cfg.TikvImporter.DiskQuota), Greater, int64(0)) + c.Assert(int64(cfg.TikvImporter.DiskQuota), Equals, int64(0)) } diff --git a/pkg/lightning/config/const.go b/pkg/lightning/config/const.go index 240d37e27c722..21289e70bdf95 100644 --- a/pkg/lightning/config/const.go +++ b/pkg/lightning/config/const.go @@ -28,5 +28,5 @@ const ( defaultMaxAllowedPacket = 64 * units.MiB - defaultBatchSize ByteSize = 100 * units.GiB + DefaultBatchSize ByteSize = 100 * units.GiB ) diff --git a/pkg/lightning/lightning_test.go b/pkg/lightning/lightning_test.go index db7699ba021f4..bcd0260927abf 100644 --- a/pkg/lightning/lightning_test.go +++ b/pkg/lightning/lightning_test.go @@ -244,7 +244,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) { go func() { _ = s.lightning.RunServer() }() - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // Check `GET /tasks` without any active tasks @@ -376,7 +376,7 @@ func (s *lightningServerSuite) TestHTTPAPIOutsideServerMode(c *C) { go func() { errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil) }() - time.Sleep(100 * time.Millisecond) + time.Sleep(600 * time.Millisecond) var curTask struct { Current int64 diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index 64905669ecbfa..241ff7fc776a6 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -445,6 +445,7 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error { // ReadRow reads a row from the datafile. func (parser *CSVParser) ReadRow() error { row := &parser.lastRow + row.Length = 0 row.RowID++ // skip the header first @@ -476,6 +477,7 @@ func (parser *CSVParser) ReadRow() error { row.Row = make([]types.Datum, len(records)) } for i, record := range records { + row.Length += len(record) unescaped, isNull := parser.unescapeString(record) if isNull { row.Row[i].SetNull() diff --git a/pkg/lightning/mydump/csv_parser_test.go b/pkg/lightning/mydump/csv_parser_test.go index c911674d7185c..18d3a71fe40bf 100644 --- a/pkg/lightning/mydump/csv_parser_test.go +++ b/pkg/lightning/mydump/csv_parser_test.go @@ -60,7 +60,9 @@ func (s *testMydumpCSVParserSuite) runTestCases(c *C, cfg *config.CSVConfig, blo comment := Commentf("input = %q, row = %d", tc.input, i+1) e := parser.ReadRow() c.Assert(e, IsNil, Commentf("input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))) - c.Assert(parser.LastRow(), DeepEquals, mydump.Row{RowID: int64(i) + 1, Row: row}, comment) + c.Assert(parser.LastRow().RowID, DeepEquals, int64(i)+1, comment) + c.Assert(parser.LastRow().Row, DeepEquals, row, comment) + } c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF, Commentf("input = %q", tc.input)) } @@ -150,22 +152,25 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 1, - Row: datums[0], + RowID: 1, + Row: datums[0], + Length: 116, }) c.Assert(parser, posEq, 126, 1) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 2, - Row: datums[1], + RowID: 2, + Row: datums[1], + Length: 104, }) c.Assert(parser, posEq, 241, 2) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 3, - Row: datums[2], + RowID: 3, + Row: datums[2], + Length: 117, }) c.Assert(parser, posEq, 369, 3) @@ -225,10 +230,9 @@ func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { for i, expectedParserPos := range allExpectedParserPos { c.Assert(parser.ReadRow(), IsNil) - c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: int64(i + 1), - Row: datums[i], - }) + c.Assert(parser.LastRow().RowID, DeepEquals, int64(i+1)) + c.Assert(parser.LastRow().Row, DeepEquals, datums[i]) + c.Assert(parser, posEq, expectedParserPos, i+1) } @@ -254,6 +258,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("bbb"), types.NewStringDatum("ccc"), }, + Length: 9, }) c.Assert(parser, posEq, 12, 1) @@ -265,6 +270,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 24, 2) @@ -282,6 +288,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("bbb"), types.NewStringDatum("ccc"), }, + Length: 9, }) c.Assert(parser, posEq, 12, 1) @@ -293,6 +300,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 23, 2) @@ -310,6 +318,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("bbb"), types.NewStringDatum("ccc"), }, + Length: 9, }) c.Assert(parser, posEq, 18, 1) @@ -321,6 +330,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 29, 2) @@ -340,6 +350,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) types.NewStringDatum("b\nbb"), types.NewStringDatum("ccc"), }, + Length: 10, }) c.Assert(parser, posEq, 19, 1) @@ -351,6 +362,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) types.NewStringDatum("yyy"), types.NewStringDatum("xxx"), }, + Length: 9, }) c.Assert(parser, posEq, 30, 2) @@ -368,6 +380,7 @@ zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) types.NewStringDatum("b\"bb"), types.NewStringDatum("ccc"), }, + Length: 10, }) c.Assert(parser, posEq, 19, 1) @@ -395,6 +408,7 @@ func (s *testMydumpCSVParserSuite) TestMySQL(c *C) { types.NewStringDatum(`\`), types.NewStringDatum("?"), }, + Length: 6, }) c.Assert(parser, posEq, 15, 1) @@ -406,6 +420,7 @@ func (s *testMydumpCSVParserSuite) TestMySQL(c *C) { nullDatum, types.NewStringDatum(`\N`), }, + Length: 7, }) c.Assert(parser, posEq, 26, 2) @@ -463,6 +478,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { types.NewStringDatum("foo"), types.NewStringDatum("0000-00-00"), }, + Length: 14, }) c.Assert(parser, posEq, 32, 1) c.Assert(parser.Columns(), DeepEquals, []string{"a", "b", "c", "d", "e", "f"}) @@ -478,6 +494,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { types.NewStringDatum("foo"), types.NewStringDatum("0000-00-00"), }, + Length: 14, }) c.Assert(parser, posEq, 52, 2) @@ -492,6 +509,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { types.NewStringDatum("bar"), types.NewStringDatum("1999-12-31"), }, + Length: 23, }) c.Assert(parser, posEq, 80, 3) @@ -513,6 +531,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { nullDatum, types.NewStringDatum("abc"), }, + Length: 4, }) c.Assert(parser, posEq, 12, 1) @@ -524,6 +543,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { types.NewStringDatum("1999-12-31"), types.NewStringDatum("test"), }, + Length: 17, }) c.Assert(parser.Close(), IsNil) @@ -539,6 +559,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { nullDatum, types.NewStringDatum("abc"), }, + Length: 4, }) c.Assert(parser, posEq, 17, 1) @@ -574,26 +595,30 @@ func (s *testMydumpCSVParserSuite) TestCRLF(c *C) { c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 1, - Row: []types.Datum{types.NewStringDatum("a")}, + RowID: 1, + Row: []types.Datum{types.NewStringDatum("a")}, + Length: 1, }) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 2, - Row: []types.Datum{types.NewStringDatum("b")}, + RowID: 2, + Row: []types.Datum{types.NewStringDatum("b")}, + Length: 1, }) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 3, - Row: []types.Datum{types.NewStringDatum("c")}, + RowID: 3, + Row: []types.Datum{types.NewStringDatum("c")}, + Length: 1, }) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ - RowID: 4, - Row: []types.Datum{types.NewStringDatum("d")}, + RowID: 4, + Row: []types.Datum{types.NewStringDatum("d")}, + Length: 1, }) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) @@ -614,6 +639,7 @@ func (s *testMydumpCSVParserSuite) TestQuotedSeparator(c *C) { types.NewStringDatum("'"), types.NewStringDatum("'"), }, + Length: 3, }) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index fc14bc0af8e8a..caac10a6a0202 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -37,12 +37,14 @@ type MDDatabaseMeta struct { } type MDTableMeta struct { - DB string - Name string - SchemaFile FileInfo - DataFiles []FileInfo - charSet string - TotalSize int64 + DB string + Name string + SchemaFile FileInfo + DataFiles []FileInfo + charSet string + TotalSize int64 + IndexRatio float64 + IsRowOrdered bool } type SourceFileMeta struct { @@ -427,11 +429,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool } s.tableIndexMap[fileInfo.TableName] = len(dbMeta.Tables) ptr := &MDTableMeta{ - DB: fileInfo.TableName.Schema, - Name: fileInfo.TableName.Name, - SchemaFile: fileInfo, - DataFiles: make([]FileInfo, 0, 16), - charSet: s.loader.charSet, + DB: fileInfo.TableName.Schema, + Name: fileInfo.TableName.Name, + SchemaFile: fileInfo, + DataFiles: make([]FileInfo, 0, 16), + charSet: s.loader.charSet, + IndexRatio: 0.0, + IsRowOrdered: true, } dbMeta.Tables = append(dbMeta.Tables, ptr) return ptr, dbExists, false @@ -442,10 +446,12 @@ func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) { _, ok := s.tableIndexMap[fileInfo.TableName] if ok { meta := &MDTableMeta{ - DB: fileInfo.TableName.Schema, - Name: fileInfo.TableName.Name, - SchemaFile: fileInfo, - charSet: s.loader.charSet, + DB: fileInfo.TableName.Schema, + Name: fileInfo.TableName.Name, + SchemaFile: fileInfo, + charSet: s.loader.charSet, + IndexRatio: 0.0, + IsRowOrdered: true, } dbMeta.Views = append(dbMeta.Views, meta) } diff --git a/pkg/lightning/mydump/loader_test.go b/pkg/lightning/mydump/loader_test.go index bea6333a04b16..1c7a8eb3bd84f 100644 --- a/pkg/lightning/mydump/loader_test.go +++ b/pkg/lightning/mydump/loader_test.go @@ -275,10 +275,12 @@ func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) { Name: "db", SchemaFile: "", Tables: []*md.MDTableMeta{{ - DB: "db", - Name: "tbl", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl"}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.sql", Type: md.SourceTypeSQL}}}, + DB: "db", + Name: "tbl", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl"}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.sql", Type: md.SourceTypeSQL}}}, + IsRowOrdered: true, + IndexRatio: 0.0, }}, }}) } @@ -303,16 +305,20 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) { SchemaFile: "db-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "db", - Name: "0002", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002.sql", Type: md.SourceTypeSQL}}}, + DB: "db", + Name: "0002", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "0002"}, FileMeta: md.SourceFileMeta{Path: "db.0002.sql", Type: md.SourceTypeSQL}}}, + IsRowOrdered: true, + IndexRatio: 0.0, }, { - DB: "db", - Name: "tbl.with.dots", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots.0001.sql", Type: md.SourceTypeSQL, SortKey: "0001"}}}, + DB: "db", + Name: "tbl.with.dots", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "db", Name: "tbl.with.dots"}, FileMeta: md.SourceFileMeta{Path: "db.tbl.with.dots.0001.sql", Type: md.SourceTypeSQL, SortKey: "0001"}}}, + IsRowOrdered: true, + IndexRatio: 0.0, }, }, }}) @@ -393,23 +399,29 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { SchemaFile: "a1-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "a1", - Name: "s1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + DB: "a1", + Name: "s1", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "a1", Name: "s1"}, FileMeta: md.SourceFileMeta{Path: "a1.s1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, { - DB: "a1", - Name: "v1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{}, + DB: "a1", + Name: "v1", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, Views: []*md.MDTableMeta{ { - DB: "a1", - Name: "v1", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema-view.sql", Type: md.SourceTypeViewSchema}}, + DB: "a1", + Name: "v1", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: "a1.v1-schema-view.sql", Type: md.SourceTypeViewSchema}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -430,6 +442,8 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { {TableName: filter.Table{Schema: "b", Name: "u"}, FileMeta: md.SourceFileMeta{Path: "a0.t1.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}, {TableName: filter.Table{Schema: "b", Name: "u"}, FileMeta: md.SourceFileMeta{Path: "a1.t2.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}, }, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -438,10 +452,12 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { SchemaFile: "c0-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "c", - Name: "t3", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + DB: "c", + Name: "t3", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "c", Name: "t3"}, FileMeta: md.SourceFileMeta{Path: "c0.t3.1.sql", Type: md.SourceTypeSQL, SortKey: "1"}}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -450,17 +466,21 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) { SchemaFile: "e0-schema-create.sql", Tables: []*md.MDTableMeta{ { - DB: "v", - Name: "vv", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema.sql", Type: md.SourceTypeTableSchema}}, - DataFiles: []md.FileInfo{}, + DB: "v", + Name: "vv", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema.sql", Type: md.SourceTypeTableSchema}}, + DataFiles: []md.FileInfo{}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, Views: []*md.MDTableMeta{ { - DB: "v", - Name: "vv", - SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema-view.sql", Type: md.SourceTypeViewSchema}}, + DB: "v", + Name: "vv", + SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: "vv"}, FileMeta: md.SourceFileMeta{Path: "e0.f0-schema-view.sql", Type: md.SourceTypeViewSchema}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -555,6 +575,8 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/test2.001.sql"), Type: md.SourceTypeSQL}, }, }, + IndexRatio: 0.0, + IsRowOrdered: true, }, { DB: "d1", @@ -563,7 +585,9 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/v1-table.sql"), Type: md.SourceTypeTableSchema}, }, - DataFiles: []md.FileInfo{}, + DataFiles: []md.FileInfo{}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, Views: []*md.MDTableMeta{ @@ -574,6 +598,8 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { TableName: filter.Table{Schema: "d1", Name: "v1"}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/v1-view.sql"), Type: md.SourceTypeViewSchema}, }, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, @@ -588,7 +614,9 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) { TableName: filter.Table{Schema: "d2", Name: "abc"}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d2/abc-table.sql"), Type: md.SourceTypeTableSchema}, }, - DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "d2", Name: "abc"}, FileMeta: md.SourceFileMeta{Path: "abc.1.sql", Type: md.SourceTypeSQL}}}, + DataFiles: []md.FileInfo{{TableName: filter.Table{Schema: "d2", Name: "abc"}, FileMeta: md.SourceFileMeta{Path: "abc.1.sql", Type: md.SourceTypeSQL}}}, + IndexRatio: 0.0, + IsRowOrdered: true, }, }, }, diff --git a/pkg/lightning/mydump/parquet_parser.go b/pkg/lightning/mydump/parquet_parser.go index 303ea42ca81bd..48320e0fba2d5 100644 --- a/pkg/lightning/mydump/parquet_parser.go +++ b/pkg/lightning/mydump/parquet_parser.go @@ -346,6 +346,7 @@ func (pp *ParquetParser) Close() error { func (pp *ParquetParser) ReadRow() error { pp.lastRow.RowID++ + pp.lastRow.Length = 0 if pp.curIndex >= len(pp.rows) { if pp.readRows >= pp.Reader.GetNumRows() { return io.EOF @@ -376,6 +377,7 @@ func (pp *ParquetParser) ReadRow() error { pp.lastRow.Row = pp.lastRow.Row[:length] } for i := 0; i < length; i++ { + pp.lastRow.Length += getDatumLen(v.Field(i)) if err := setDatumValue(&pp.lastRow.Row[i], v.Field(i), pp.columnMetas[i]); err != nil { return err } @@ -383,6 +385,20 @@ func (pp *ParquetParser) ReadRow() error { return nil } +func getDatumLen(v reflect.Value) int { + if v.Kind() == reflect.Ptr { + if v.IsNil() { + return 0 + } else { + return getDatumLen(v.Elem()) + } + } + if v.Kind() == reflect.String { + return len(v.String()) + } + return 8 +} + // convert a parquet value to Datum // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md diff --git a/pkg/lightning/mydump/parser.go b/pkg/lightning/mydump/parser.go index 18079199e7361..668f519e67938 100644 --- a/pkg/lightning/mydump/parser.go +++ b/pkg/lightning/mydump/parser.go @@ -92,8 +92,9 @@ type Chunk struct { // Row is the content of a row. type Row struct { - RowID int64 - Row []types.Datum + RowID int64 + Row []types.Datum + Length int } // MarshalLogArray implements the zapcore.ArrayMarshaler interface @@ -413,6 +414,7 @@ func (parser *ChunkParser) ReadRow() error { row := &parser.lastRow st := stateValues + row.Length = 0 for { tok, content, err := parser.lex() @@ -422,6 +424,7 @@ func (parser *ChunkParser) ReadRow() error { } return errors.Trace(err) } + row.Length += len(content) switch st { case stateTableName: switch tok { diff --git a/pkg/lightning/mydump/parser_test.go b/pkg/lightning/mydump/parser_test.go index 53ee5e1192872..2bc2eb97d556c 100644 --- a/pkg/lightning/mydump/parser_test.go +++ b/pkg/lightning/mydump/parser_test.go @@ -45,7 +45,8 @@ func (s *testMydumpParserSuite) runTestCases(c *C, mode mysql.SQLMode, blockBufS e := parser.ReadRow() comment := Commentf("input = %q, row = %d, err = %s", tc.input, i+1, errors.ErrorStack(e)) c.Assert(e, IsNil, comment) - c.Assert(parser.LastRow(), DeepEquals, mydump.Row{RowID: int64(i) + 1, Row: row}, comment) + c.Assert(parser.LastRow().RowID, DeepEquals, int64(i)+1) + c.Assert(parser.LastRow().Row, DeepEquals, row) } c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF, Commentf("input = %q", tc.input)) } @@ -76,6 +77,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewIntDatum(-2), types.NewUintDatum(3), }, + Length: 62, }) c.Assert(parser.Columns(), DeepEquals, []string{"columns", "more", "columns"}) offset, rowID := parser.Pos() @@ -90,6 +92,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewStringDatum("5."), types.NewUintDatum(6), }, + Length: 6, }) c.Assert(parser.Columns(), DeepEquals, []string{"columns", "more", "columns"}) offset, rowID = parser.Pos() @@ -104,6 +107,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewUintDatum(8), types.NewUintDatum(9), }, + Length: 42, }) c.Assert(parser.Columns(), DeepEquals, []string{"x", "y", "z"}) offset, rowID = parser.Pos() @@ -122,6 +126,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { types.NewUintDatum(14), types.NewStringDatum(")"), }, + Length: 49, }) c.Assert(parser.Columns(), IsNil) offset, rowID = parser.Pos() diff --git a/pkg/lightning/mydump/region.go b/pkg/lightning/mydump/region.go index 1905ad41ffac8..f17440e8cdda2 100644 --- a/pkg/lightning/mydump/region.go +++ b/pkg/lightning/mydump/region.go @@ -20,8 +20,6 @@ import ( "sync" "time" - "github.com/pingcap/br/pkg/utils" - "github.com/pingcap/errors" "go.uber.org/zap" @@ -29,6 +27,7 @@ import ( "github.com/pingcap/br/pkg/lightning/log" "github.com/pingcap/br/pkg/lightning/worker" "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/utils" ) const tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 @@ -226,12 +225,21 @@ func MakeTableRegions( prevRowIDMax = fileRegionsRes.regions[len(fileRegionsRes.regions)-1].Chunk.RowIDMax } + batchSize := float64(cfg.Mydumper.BatchSize) + if cfg.Mydumper.BatchSize <= 0 { + if meta.IsRowOrdered { + batchSize = float64(config.DefaultBatchSize) + } else { + batchSize = math.Max(float64(config.DefaultBatchSize), float64(meta.TotalSize)) + } + } + log.L().Info("makeTableRegions", zap.Int("filesCount", len(meta.DataFiles)), - zap.Int64("maxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), + zap.Int64("MaxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), zap.Int("RegionsCount", len(filesRegions)), + zap.Float64("BatchSize", batchSize), zap.Duration("cost", time.Since(start))) - - AllocateEngineIDs(filesRegions, dataFileSizes, float64(cfg.Mydumper.BatchSize), cfg.Mydumper.BatchImportRatio, float64(cfg.App.TableConcurrency)) + AllocateEngineIDs(filesRegions, dataFileSizes, batchSize, cfg.Mydumper.BatchImportRatio, float64(cfg.App.TableConcurrency)) return filesRegions, nil } diff --git a/pkg/lightning/restore/check_info.go b/pkg/lightning/restore/check_info.go index d30d7a1321e83..6bc4691b361e6 100644 --- a/pkg/lightning/restore/check_info.go +++ b/pkg/lightning/restore/check_info.go @@ -14,6 +14,7 @@ package restore import ( + "bytes" "context" "fmt" "io" @@ -23,18 +24,23 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" - "github.com/pingcap/log" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/table/tables" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/api" - "github.com/tikv/pd/server/config" + pdconfig "github.com/tikv/pd/server/config" "go.uber.org/zap" "github.com/pingcap/br/pkg/lightning/backend" + "github.com/pingcap/br/pkg/lightning/backend/kv" "github.com/pingcap/br/pkg/lightning/checkpoints" "github.com/pingcap/br/pkg/lightning/common" - md "github.com/pingcap/br/pkg/lightning/mydump" + "github.com/pingcap/br/pkg/lightning/config" + "github.com/pingcap/br/pkg/lightning/log" + "github.com/pingcap/br/pkg/lightning/mydump" + "github.com/pingcap/br/pkg/lightning/verification" "github.com/pingcap/br/pkg/storage" ) @@ -52,7 +58,9 @@ const ( pdStores = "/pd/api/v1/stores" pdReplicate = "/pd/api/v1/config/replicate" - defaultCSVSize = 10 * units.GiB + defaultCSVSize = 10 * units.GiB + maxSampleDataSize = 10 * 1024 * 1024 + maxSampleRowCount = 10 * 1024 ) func (rc *Controller) isSourceInLocal() bool { @@ -60,7 +68,7 @@ func (rc *Controller) isSourceInLocal() bool { } func (rc *Controller) getReplicaCount(ctx context.Context) (uint64, error) { - result := &config.ReplicationConfig{} + result := &pdconfig.ReplicationConfig{} err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result) if err != nil { return 0, errors.Trace(err) @@ -69,7 +77,7 @@ func (rc *Controller) getReplicaCount(ctx context.Context) (uint64, error) { } // ClusterResource check cluster has enough resource to import data. this test can by skipped. -func (rc *Controller) ClusterResource(ctx context.Context) error { +func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error { passed := true message := "Cluster resources are rich for this import task" defer func() { @@ -81,33 +89,30 @@ func (rc *Controller) ClusterResource(ctx context.Context) error { if err != nil { return errors.Trace(err) } - totalAvailable := typeutil.ByteSize(0) + totalCapacity := typeutil.ByteSize(0) for _, store := range result.Stores { - totalAvailable += store.Status.Available + totalCapacity += store.Status.Capacity } - var sourceSize int64 - err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { - sourceSize += size - return nil - }) - if err != nil { - return errors.Trace(err) + clusterSource := localSource + if rc.taskMgr != nil { + clusterSource, err = rc.taskMgr.CheckClusterSource(ctx) + if err != nil { + return errors.Trace(err) + } } + replicaCount, err := rc.getReplicaCount(ctx) if err != nil { return errors.Trace(err) } - // sourceSize is the total size of current csv/parquet/sql files. - // it's not a simple multiple relationship with the final cluster occupancy, because - // 1. sourceSize didn't compress with RocksDB. - // 2. the index size was not included in sourceSize. - // so we have to make estimateSize redundant with 1.5. - estimateSize := uint64(sourceSize) * replicaCount * 3 / 2 - - if typeutil.ByteSize(estimateSize) > totalAvailable { + estimateSize := uint64(clusterSource) * replicaCount + if typeutil.ByteSize(estimateSize) > totalCapacity { passed = false - message = fmt.Sprintf("Cluster doesn't have enough space, %s is avaiable, but we need %s", - units.BytesSize(float64(totalAvailable)), units.BytesSize(float64(estimateSize))) + message = fmt.Sprintf("Cluster doesn't have enough space, capacity is %s, but we need %s", + units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize))) + } else { + message = fmt.Sprintf("Cluster capacity is rich, capacity is %s, we need %s", + units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize))) } return nil } @@ -163,7 +168,7 @@ func (rc *Controller) StoragePermission(ctx context.Context) error { // HasLargeCSV checks whether input csvs is fit for Lightning import. // If strictFormat is false, and csv file is large. Lightning will have performance issue. // this test cannot be skipped. -func (rc *Controller) HasLargeCSV(dbMetas []*md.MDDatabaseMeta) error { +func (rc *Controller) HasLargeCSV(dbMetas []*mydump.MDDatabaseMeta) error { passed := true message := "Source csv files size is proper" defer func() { @@ -186,8 +191,47 @@ func (rc *Controller) HasLargeCSV(dbMetas []*md.MDDatabaseMeta) error { return nil } +func (rc *Controller) EstimateSourceData(ctx context.Context) (int64, error) { + sourceSize := int64(0) + originSource := int64(0) + bigTableCount := 0 + tableCount := 0 + unSortedTableCount := 0 + for _, db := range rc.dbMetas { + info, ok := rc.dbInfos[db.Name] + if !ok { + continue + } + for _, tbl := range db.Tables { + tableInfo, ok := info.Tables[tbl.Name] + if ok { + if err := rc.SampleDataFromTable(ctx, db.Name, tbl, tableInfo.Core); err != nil { + return sourceSize, errors.Trace(err) + } + sourceSize += int64(float64(tbl.TotalSize) * tbl.IndexRatio) + originSource += tbl.TotalSize + if tbl.TotalSize > int64(config.DefaultBatchSize)*2 { + bigTableCount += 1 + if !tbl.IsRowOrdered { + unSortedTableCount += 1 + } + } + tableCount += 1 + } + } + } + + // Do not import with too large concurrency because these data may be all unsorted. + if bigTableCount > 0 && unSortedTableCount > 0 { + if rc.cfg.App.TableConcurrency > rc.cfg.App.IndexConcurrency { + rc.cfg.App.TableConcurrency = rc.cfg.App.IndexConcurrency + } + } + return sourceSize, nil +} + // LocalResource checks the local node has enough resources for this import when local backend enabled; -func (rc *Controller) LocalResource(ctx context.Context) error { +func (rc *Controller) LocalResource(ctx context.Context, sourceSize int64) error { if rc.isSourceInLocal() { sourceDir := strings.TrimPrefix(rc.cfg.Mydumper.SourceDir, storage.LocalURIPrefix) same, err := common.SameDisk(sourceDir, rc.cfg.TikvImporter.SortedKVDir) @@ -200,46 +244,54 @@ func (rc *Controller) LocalResource(ctx context.Context) error { rc.cfg.TikvImporter.SortedKVDir, sourceDir)) } } - var sourceSize uint64 - err := rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { - sourceSize += uint64(size) - return nil - }) - if err != nil { - return errors.Trace(err) - } storageSize, err := common.GetStorageSize(rc.cfg.TikvImporter.SortedKVDir) if err != nil { return errors.Trace(err) } localAvailable := storageSize.Available + if err = rc.taskMgr.InitTask(ctx, sourceSize); err != nil { + return errors.Trace(err) + } var message string var passed bool switch { - case localAvailable > sourceSize*3/2: - message = fmt.Sprintf("local disk resources are rich, source dir has %s, local available is %s", + case localAvailable > uint64(sourceSize): + message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s", units.BytesSize(float64(sourceSize)), units.BytesSize(float64(localAvailable))) passed = true default: - message = fmt.Sprintf("local disk space may not enough to finish import, source dir has %s, but local available is %s,"+ - "we may use disk-quota(%s) to finish imports", units.BytesSize(float64(sourceSize)), - units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) - passed = true + if int64(rc.cfg.TikvImporter.DiskQuota) > int64(localAvailable) { + message = fmt.Sprintf("local disk space may not enough to finish import"+ + "estimate sorted data size is %s, but local available is %s,"+ + "you need a smaller number for tikv-importer.disk-quota (%s) to finish imports", + units.BytesSize(float64(sourceSize)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) + passed = false + log.L().Error(message) + } else { + message = fmt.Sprintf("local disk space may not enough to finish import, "+ + "estimate sorted data size is %s, but local available is %s,"+ + "we will use disk-quota (size: %s) to finish imports, which may slow down import", + units.BytesSize(float64(sourceSize)), + units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota))) + passed = true + log.L().Warn(message) + } } rc.checkTemplate.Collect(Critical, passed, message) return nil } // CheckpointIsValid checks whether we can start this import with this checkpoint. -func (rc *Controller) CheckpointIsValid(ctx context.Context, tableInfo *md.MDTableMeta) ([]string, bool, error) { +func (rc *Controller) CheckpointIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, bool, error) { msgs := make([]string, 0) uniqueName := common.UniqueTable(tableInfo.DB, tableInfo.Name) tableCheckPoint, err := rc.checkpointsDB.Get(ctx, uniqueName) if err != nil { // there is no checkpoint - log.Debug("no checkpoint detected", zap.String("table", uniqueName)) + log.L().Debug("no checkpoint detected", zap.String("table", uniqueName)) return nil, true, nil } // if checkpoint enable and not missing, we skip the check table empty progress. @@ -262,7 +314,7 @@ func (rc *Controller) CheckpointIsValid(ctx context.Context, tableInfo *md.MDTab } } if len(columns) == 0 { - log.Debug("no valid checkpoint detected", zap.String("table", uniqueName)) + log.L().Debug("no valid checkpoint detected", zap.String("table", uniqueName)) return nil, false, nil } info := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name] @@ -286,10 +338,10 @@ func hasDefault(col *model.ColumnInfo) bool { col.IsGenerated() || mysql.HasAutoIncrementFlag(col.Flag) } -func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.SourceFileMeta) (cols []string, colCnt int, err error) { +func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydump.SourceFileMeta) (cols []string, colCnt int, err error) { var reader storage.ReadSeekCloser - if dataFileMeta.Type == md.SourceTypeParquet { - reader, err = md.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) + if dataFileMeta.Type == mydump.SourceTypeParquet { + reader, err = mydump.OpenParquetReader(ctx, rc.store, dataFileMeta.Path, dataFileMeta.FileSize) } else { reader, err = rc.store.Open(ctx, dataFileMeta.Path) } @@ -297,16 +349,16 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.S return nil, 0, errors.Trace(err) } - var parser md.Parser + var parser mydump.Parser blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize) switch dataFileMeta.Type { - case md.SourceTypeCSV: + case mydump.SourceTypeCSV: hasHeader := rc.cfg.Mydumper.CSV.Header - parser = md.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) - case md.SourceTypeSQL: - parser = md.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) - case md.SourceTypeParquet: - parser, err = md.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) + parser = mydump.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) + case mydump.SourceTypeSQL: + parser = mydump.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) + case mydump.SourceTypeParquet: + parser, err = mydump.NewParquetParser(ctx, rc.store, reader, dataFileMeta.Path) if err != nil { return nil, 0, errors.Trace(err) } @@ -323,7 +375,7 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta md.S } // SchemaIsValid checks the import file and cluster schema is match. -func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMeta) ([]string, error) { +func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) { msgs := make([]string, 0) info, ok := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name] if !ok { @@ -342,7 +394,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe } if len(tableInfo.DataFiles) == 0 { - log.Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) + log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) return nil, nil } @@ -362,7 +414,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe // get columns name from data file. dataFileMeta := dataFile.FileMeta - if tp := dataFileMeta.Type; tp != md.SourceTypeCSV && tp != md.SourceTypeSQL && tp != md.SourceTypeParquet { + if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet { msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String())) return msgs, nil } @@ -371,7 +423,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe return nil, errors.Trace(err) } if colsFromDataFile == nil && colCountFromDataFile == 0 { - log.Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) + log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path)) continue } @@ -412,7 +464,7 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe for _, col := range colsFromDataFile { if _, ok := colMap[col]; !ok { checkMsg := "please check table schema" - if dataFileMeta.Type == md.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { + if dataFileMeta.Type == mydump.SourceTypeCSV && rc.cfg.Mydumper.CSV.Header { checkMsg += " and csv file header" } msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't have column %s, "+ @@ -439,3 +491,126 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *md.MDTableMe } return msgs, nil } + +func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error { + if len(tableMeta.DataFiles) == 0 { + return nil + } + sampleFile := tableMeta.DataFiles[0].FileMeta + var reader storage.ReadSeekCloser + var err error + if sampleFile.Type == mydump.SourceTypeParquet { + reader, err = mydump.OpenParquetReader(ctx, rc.store, sampleFile.Path, sampleFile.FileSize) + } else { + reader, err = rc.store.Open(ctx, sampleFile.Path) + } + if err != nil { + return errors.Trace(err) + } + idAlloc := kv.NewPanickingAllocators(0) + tbl, err := tables.TableFromMeta(idAlloc, tableInfo) + + kvEncoder, err := rc.backend.NewEncoder(tbl, &kv.SessionOptions{ + SQLMode: rc.cfg.TiDB.SQLMode, + Timestamp: 0, + SysVars: rc.sysVars, + AutoRandomSeed: 0, + }) + blockBufSize := int64(rc.cfg.Mydumper.ReadBlockSize) + + var parser mydump.Parser + switch tableMeta.DataFiles[0].FileMeta.Type { + case mydump.SourceTypeCSV: + hasHeader := rc.cfg.Mydumper.CSV.Header + parser = mydump.NewCSVParser(&rc.cfg.Mydumper.CSV, reader, blockBufSize, rc.ioWorkers, hasHeader) + case mydump.SourceTypeSQL: + parser = mydump.NewChunkParser(rc.cfg.TiDB.SQLMode, reader, blockBufSize, rc.ioWorkers) + case mydump.SourceTypeParquet: + parser, err = mydump.NewParquetParser(ctx, rc.store, reader, sampleFile.Path) + if err != nil { + return errors.Trace(err) + } + default: + panic(fmt.Sprintf("file '%s' with unknown source type '%s'", sampleFile.Path, sampleFile.Type.String())) + } + defer parser.Close() + logTask := log.With(zap.String("table", tableMeta.Name)).Begin(zap.InfoLevel, "sample file") + igCols, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(dbName, tableMeta.Name, rc.cfg.Mydumper.CaseSensitive) + if err != nil { + return errors.Trace(err) + } + + initializedColumns, reachEOF := false, false + var columnPermutation []int + var kvSize uint64 = 0 + var rowSize uint64 = 0 + rowCount := 0 + dataKVs := rc.backend.MakeEmptyRows() + indexKVs := rc.backend.MakeEmptyRows() + lastKey := make([]byte, 0) + tableMeta.IsRowOrdered = true + tableMeta.IndexRatio = 1.0 +outloop: + for !reachEOF { + offset, _ := parser.Pos() + err = parser.ReadRow() + columnNames := parser.Columns() + + switch errors.Cause(err) { + case nil: + if !initializedColumns { + if len(columnPermutation) == 0 { + columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo) + if err != nil { + return errors.Trace(err) + } + } + initializedColumns = true + } + case io.EOF: + reachEOF = true + break outloop + default: + err = errors.Annotatef(err, "in file offset %d", offset) + return errors.Trace(err) + } + lastRow := parser.LastRow() + rowSize += uint64(lastRow.Length) + rowCount += 1 + + var dataChecksum, indexChecksum verification.KVChecksum + kvs, encodeErr := kvEncoder.Encode(logTask.Logger, lastRow.Row, lastRow.RowID, columnPermutation, offset) + parser.RecycleRow(lastRow) + if encodeErr != nil { + err = errors.Annotatef(encodeErr, "in file at offset %d", offset) + return errors.Trace(err) + } + if tableMeta.IsRowOrdered { + kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) + for _, kv := range kv.KvPairsFromRows(dataKVs) { + if len(lastKey) == 0 { + lastKey = kv.Key + } else if bytes.Compare(lastKey, kv.Key) > 0 { + tableMeta.IsRowOrdered = false + break + } + } + dataKVs = dataKVs.Clear() + indexKVs = indexKVs.Clear() + } + kvSize += kvs.Size() + + failpoint.Inject("mock-kv-size", func(val failpoint.Value) { + kvSize += uint64(val.(int)) + }) + if rowSize > maxSampleDataSize && rowCount > maxSampleRowCount { + break + } + } + + if rowSize > 0 && kvSize > rowSize { + tableMeta.IndexRatio = float64(kvSize) / float64(rowSize) + } + log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered)) + return nil +} diff --git a/pkg/lightning/restore/meta_manager.go b/pkg/lightning/restore/meta_manager.go index ab77fdf8bc02b..f80ee263359be 100644 --- a/pkg/lightning/restore/meta_manager.go +++ b/pkg/lightning/restore/meta_manager.go @@ -460,11 +460,18 @@ func (m *dbTableMetaMgr) FinishTable(ctx context.Context) error { } type taskMetaMgr interface { - InitTask(ctx context.Context) error + InitTask(ctx context.Context, source int64) error + CheckClusterSource(ctx context.Context) (int64, error) + CheckTaskExist(ctx context.Context) (bool, error) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) - CheckAndFinishRestore(ctx context.Context) (bool, error) + // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata + // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) + // the second boolean indicates whether to clean up the metadata in tidb + CheckAndFinishRestore(ctx context.Context, finished bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) Cleanup(ctx context.Context) error + CleanupTask(ctx context.Context) error CleanupAllMetas(ctx context.Context) error + Close() } type dbTaskMetaMgr struct { @@ -485,6 +492,11 @@ const ( taskMetaStatusSwitchBack ) +const ( + taskStateNormal int = iota + taskStateExited +) + func (m taskMetaStatus) String() string { switch m { case taskMetaStatusInitial: @@ -520,17 +532,65 @@ type storedCfgs struct { RestoreCfg pdutil.ClusterConfig `json:"restore"` } -func (m *dbTaskMetaMgr) InitTask(ctx context.Context) error { +func (m *dbTaskMetaMgr) InitTask(ctx context.Context, source int64) error { exec := &common.SQLWithRetry{ DB: m.session, Logger: log.L(), } // avoid override existing metadata if the meta is already inserted. - stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, status) values (?, ?)`, m.tableName) - err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String()) + stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status, source_bytes) values (?, ?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName) + err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), source, taskStateNormal) return errors.Trace(err) } +func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + // avoid override existing metadata if the meta is already inserted. + exist := false + err := exec.Transact(ctx, "check whether this task has started before", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return errors.Annotate(err, "fetch task meta failed") + } + var taskID int64 + for rows.Next() { + if err = rows.Scan(&taskID); err != nil { + rows.Close() + return errors.Trace(err) + } + if taskID == m.taskID { + exist = true + } + } + err = rows.Close() + return errors.Trace(err) + }) + return exist, errors.Trace(err) +} + +func (m *dbTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { + conn, err := m.session.Conn(ctx) + if err != nil { + return 0, errors.Trace(err) + } + defer conn.Close() + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + + source := int64(0) + query := fmt.Sprintf("SELECT SUM(source_bytes) from %s", m.tableName) + if err := exec.QueryRow(ctx, "query total source size", query, &source); err != nil { + return 0, errors.Annotate(err, "fetch task meta failed") + } + return source, nil +} + func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { pauseCtx, cancel := context.WithCancel(ctx) conn, err := m.session.Conn(ctx) @@ -553,7 +613,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U paused := false var pausedCfg storedCfgs err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -568,10 +628,11 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U taskID int64 cfg string statusValue string + state int ) var cfgStr string for rows.Next() { - if err = rows.Scan(&taskID, &cfg, &statusValue); err != nil { + if err = rows.Scan(&taskID, &cfg, &statusValue, &state); err != nil { return errors.Trace(err) } status, err := parseTaskMetaStatus(statusValue) @@ -648,10 +709,13 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U }, nil } -func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { +// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata +// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) +// the second boolean indicates whether to clean up the metadata in tidb +func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool) (bool, bool, error) { conn, err := m.session.Conn(ctx) if err != nil { - return false, errors.Trace(err) + return false, false, errors.Trace(err) } defer conn.Close() exec := &common.SQLWithRetry{ @@ -660,12 +724,13 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") if err != nil { - return false, errors.Annotate(err, "enable pessimistic transaction failed") + return false, false, errors.Annotate(err, "enable pessimistic transaction failed") } switchBack := true + allFinished := finished err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, status from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -679,10 +744,12 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) var ( taskID int64 statusValue string + state int ) - newStatus := taskMetaStatusSwitchBack + + taskStatus := taskMetaStatusInitial for rows.Next() { - if err = rows.Scan(&taskID, &statusValue); err != nil { + if err = rows.Scan(&taskID, &statusValue, &state); err != nil { return errors.Trace(err) } status, err := parseTaskMetaStatus(statusValue) @@ -691,13 +758,18 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } if taskID == m.taskID { + taskStatus = status continue } if status < taskMetaStatusSwitchSkipped { - newStatus = taskMetaStatusSwitchSkipped - switchBack = false - break + allFinished = false + // check if other task still running + if state == taskStateNormal { + log.L().Info("unfinished task found", zap.Int64("task_id", taskID), + zap.Stringer("status", status)) + switchBack = false + } } } if err = rows.Close(); err != nil { @@ -705,13 +777,28 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) } closed = true - query = fmt.Sprintf("update %s set status = ? where task_id = ?", m.tableName) - _, err = tx.ExecContext(ctx, query, newStatus.String(), m.taskID) + if taskStatus < taskMetaStatusSwitchSkipped { + newStatus := taskMetaStatusSwitchBack + newState := taskStateNormal + if !finished { + newStatus = taskStatus + newState = taskStateExited + } else if !allFinished { + newStatus = taskMetaStatusSwitchSkipped + } + + query = fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName) + if _, err = tx.ExecContext(ctx, query, newStatus.String(), newState, m.taskID); err != nil { + return errors.Trace(err) + } + } - return errors.Trace(err) + return nil }) + log.L().Info("check all task finish status", zap.Bool("task_finished", finished), + zap.Bool("all_finished", allFinished), zap.Bool("switch_back", switchBack)) - return switchBack, err + return switchBack, allFinished, err } func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error { @@ -727,6 +814,20 @@ func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error { return nil } +func (m *dbTaskMetaMgr) CleanupTask(ctx context.Context) error { + exec := &common.SQLWithRetry{ + DB: m.session, + Logger: log.L(), + } + stmt := fmt.Sprintf("DELETE FROM %s WHERE task_id = %d;", m.tableName, m.taskID) + err := exec.Exec(ctx, "clean up task", stmt) + return errors.Trace(err) +} + +func (m *dbTaskMetaMgr) Close() { + m.pd.Close() +} + func (m *dbTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { exec := &common.SQLWithRetry{ DB: m.session, @@ -768,7 +869,7 @@ func (b noopMetaMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { type noopTaskMetaMgr struct{} -func (m noopTaskMetaMgr) InitTask(ctx context.Context) error { +func (m noopTaskMetaMgr) InitTask(ctx context.Context, source int64) error { return nil } @@ -778,18 +879,33 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil. }, nil } -func (m noopTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) { +func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return false, nil } +func (m noopTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { + return 0, nil +} + +func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) { + return false, true, nil +} + func (m noopTaskMetaMgr) Cleanup(ctx context.Context) error { return nil } +func (m noopTaskMetaMgr) CleanupTask(ctx context.Context) error { + return nil +} + func (m noopTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { return nil } +func (m noopTaskMetaMgr) Close() { +} + type noopTableMetaMgr struct{} func (m noopTableMetaMgr) InitTableMeta(ctx context.Context) error { diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index b96082d441e6e..f9fa646922f93 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -101,6 +101,8 @@ const ( task_id BIGINT(20) UNSIGNED NOT NULL, pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '', status VARCHAR(32) NOT NULL, + state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish', + source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, PRIMARY KEY (task_id) );` @@ -248,6 +250,7 @@ type Controller struct { closedEngineLimit *worker.Pool store storage.ExternalStorage metaMgrBuilder metaMgrBuilder + taskMgr taskMetaMgr diskQuotaLock *diskQuotaLock diskQuotaState atomic.Int32 @@ -354,8 +357,8 @@ func NewRestoreControllerWithPauser( rc := &Controller{ cfg: cfg, dbMetas: dbMetas, - tableWorkers: worker.NewPool(ctx, cfg.App.TableConcurrency, "table"), - indexWorkers: worker.NewPool(ctx, cfg.App.IndexConcurrency, "index"), + tableWorkers: nil, + indexWorkers: nil, regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"), ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"), checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"), @@ -374,6 +377,7 @@ func NewRestoreControllerWithPauser( store: s, metaMgrBuilder: metaBuilder, diskQuotaLock: newDiskQuotaLock(), + taskMgr: nil, } return rc, nil @@ -386,9 +390,9 @@ func (rc *Controller) Close() { func (rc *Controller) Run(ctx context.Context) error { opts := []func(context.Context) error{ - rc.preCheckRequirements, rc.setGlobalVariables, rc.restoreSchema, + rc.preCheckRequirements, rc.restoreTables, rc.fullCompact, rc.switchToNormalMode, @@ -845,7 +849,12 @@ func verifyLocalFile(ctx context.Context, cpdb checkpoints.DB, dir string) error func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { estimatedChunkCount := 0.0 estimatedEngineCnt := int64(0) - batchSize := int64(rc.cfg.Mydumper.BatchSize) + batchSize := rc.cfg.Mydumper.BatchSize + if batchSize <= 0 { + // if rows in source files are not sorted by primary key(if primary is number or cluster index enabled), + // the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it. + batchSize = config.DefaultBatchSize + } for _, dbMeta := range rc.dbMetas { for _, tableMeta := range dbMeta.Tables { tableName := common.UniqueTable(dbMeta.Name, tableMeta.Name) @@ -872,7 +881,7 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { } // estimate engines count if engine cp is empty if len(dbCp.Engines) == 0 { - estimatedEngineCnt += ((tableMeta.TotalSize + batchSize - 1) / batchSize) + 1 + estimatedEngineCnt += ((tableMeta.TotalSize + int64(batchSize) - 1) / int64(batchSize)) + 1 } for _, fileMeta := range tableMeta.DataFiles { if cnt, ok := fileChunks[fileMeta.FileMeta.Path]; ok { @@ -1179,9 +1188,11 @@ var checksumManagerKey struct{} func (rc *Controller) restoreTables(ctx context.Context) error { logTask := log.L().Begin(zap.InfoLevel, "restore all tables data") - - if err := rc.metaMgrBuilder.Init(ctx); err != nil { - return err + if rc.tableWorkers == nil { + rc.tableWorkers = worker.NewPool(ctx, rc.cfg.App.TableConcurrency, "table") + } + if rc.indexWorkers == nil { + rc.indexWorkers = worker.NewPool(ctx, rc.cfg.App.IndexConcurrency, "index") } // for local backend, we should disable some pd scheduler and change some settings, to @@ -1193,27 +1204,17 @@ func (rc *Controller) restoreTables(ctx context.Context) error { // we do not do switch back automatically cleanupFunc := func() {} switchBack := false + taskFinished := false if rc.cfg.TikvImporter.Backend == config.BackendLocal { - // disable some pd schedulers - pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, - rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) - if err != nil { - return errors.Trace(err) - } - - mgr := rc.metaMgrBuilder.TaskMetaMgr(pdController) - if err = mgr.InitTask(ctx); err != nil { - return err - } logTask.Info("removing PD leader®ion schedulers") - restoreFn, err := mgr.CheckAndPausePdSchedulers(ctx) + restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx) finishSchedulers = func() { if restoreFn != nil { // use context.Background to make sure this restore function can still be executed even if ctx is canceled restoreCtx := context.Background() - needSwitchBack, err := mgr.CheckAndFinishRestore(restoreCtx) + needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) if err != nil { logTask.Warn("check restore pd schedulers failed", zap.Error(err)) return @@ -1223,22 +1224,25 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if restoreE := restoreFn(restoreCtx); restoreE != nil { logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } + + logTask.Info("add back PD leader®ion schedulers") // clean up task metas - if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil { - logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) - } - // cleanup table meta and schema db if needed. - cleanupFunc = func() { - if e := mgr.CleanupAllMetas(restoreCtx); err != nil { - logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) + if needCleanup { + logTask.Info("cleanup task metas") + if cleanupErr := rc.taskMgr.Cleanup(restoreCtx); cleanupErr != nil { + logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr)) + } + // cleanup table meta and schema db if needed. + cleanupFunc = func() { + if e := rc.taskMgr.CleanupAllMetas(restoreCtx); err != nil { + logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e)) + } } } } - - logTask.Info("add back PD leader®ion schedulers") } - pdController.Close() + rc.taskMgr.Close() } if err != nil { @@ -1433,6 +1437,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { // finishSchedulers() // cancelFunc(switchBack) // finishFuncCalled = true + taskFinished = true close(postProcessTaskChan) // otherwise, we should run all tasks in the post-process task chan @@ -1565,43 +1570,6 @@ func (tr *TableRestore) restoreTable( return tr.postProcess(ctx, rc, cp, false /* force-analyze */, metaMgr) } -// estimate SST files compression threshold by total row file size -// with a higher compression threshold, the compression time increases, but the iteration time decreases. -// Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, -// we set the upper bound to 32GB to avoid too long compression time. -// factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine). -func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) int64 { - totalRawFileSize := int64(0) - var lastFile string - for _, engineCp := range cp.Engines { - for _, chunk := range engineCp.Chunks { - if chunk.FileMeta.Path == lastFile { - continue - } - size := chunk.FileMeta.FileSize - if chunk.FileMeta.Type == mydump.SourceTypeParquet { - // parquet file is compressed, thus estimates with a factor of 2 - size *= 2 - } - totalRawFileSize += size - lastFile = chunk.FileMeta.Path - } - } - totalRawFileSize *= factor - - // try restrict the total file number within 512 - threshold := totalRawFileSize / 512 - threshold = utils.NextPowerOfTwo(threshold) - if threshold < compactionLowerThreshold { - // disable compaction if threshold is smaller than lower bound - threshold = 0 - } else if threshold > compactionUpperThreshold { - threshold = compactionUpperThreshold - } - - return threshold -} - // do full compaction for the whole data. func (rc *Controller) fullCompact(ctx context.Context) error { if !rc.cfg.PostRestore.Compact { @@ -1798,10 +1766,6 @@ func (rc *Controller) isLocalBackend() bool { // 3. Lightning configuration // before restore tables start. func (rc *Controller) preCheckRequirements(ctx context.Context) error { - if !rc.cfg.App.CheckRequirements { - log.L().Info("skip pre check due to user requirement") - return nil - } if err := rc.ClusterIsAvailable(ctx); err != nil { return errors.Trace(err) } @@ -1809,15 +1773,50 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err := rc.StoragePermission(ctx); err != nil { return errors.Trace(err) } - - if err := rc.ClusterResource(ctx); err != nil { - return errors.Trace(err) + if err := rc.metaMgrBuilder.Init(ctx); err != nil { + return err } + taskExist := false if rc.isLocalBackend() { - if err := rc.LocalResource(ctx); err != nil { + source, err := rc.EstimateSourceData(ctx) + if err != nil { + return errors.Trace(err) + } + + pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, + rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) + if err != nil { + return errors.Trace(err) + } + + rc.taskMgr = rc.metaMgrBuilder.TaskMetaMgr(pdController) + taskExist, err = rc.taskMgr.CheckTaskExist(ctx) + if err != nil { return errors.Trace(err) } + if !taskExist { + err = rc.LocalResource(ctx, source) + if err != nil { + rc.taskMgr.CleanupTask(ctx) + return errors.Trace(err) + } + if err := rc.ClusterResource(ctx, source); err != nil { + rc.taskMgr.CleanupTask(ctx) + return errors.Trace(err) + } + } + } + if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() { + // print check template only if check requirements is true. + fmt.Print(rc.checkTemplate.Output()) + if !rc.checkTemplate.Success() { + if !taskExist && rc.taskMgr != nil { + rc.taskMgr.CleanupTask(ctx) + } + return errors.Errorf("tidb-lightning pre-check failed." + + " Please fix the failed check(s) or set --check-requirements=false to skip checks") + } } return nil } diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index 1ca80c9417751..44a611d1d24d5 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -948,8 +948,8 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics(c *C) { s.tableInfo.DB: s.dbInfo, }, tableWorkers: worker.NewPool(ctx, 6, "table"), - indexWorkers: worker.NewPool(ctx, 2, "index"), ioWorkers: worker.NewPool(ctx, 5, "io"), + indexWorkers: worker.NewPool(ctx, 2, "index"), regionWorkers: worker.NewPool(ctx, 10, "region"), checksumWorks: worker.NewPool(ctx, 2, "region"), saveCpCh: chptCh, @@ -1598,7 +1598,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { "id": 2 }, "status": { - "available": "24" + "capacity": "24" } } ] @@ -1606,7 +1606,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { []byte(`{ "max-replicas": 1 }`), - "(.*)Cluster resources are rich for this import task(.*)", + "(.*)Cluster capacity is rich(.*)", true, 0, }, @@ -1619,7 +1619,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { "id": 2 }, "status": { - "available": "23" + "capacity": "15" } } ] @@ -1664,7 +1664,12 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { url := strings.TrimPrefix(server.URL, "https://") cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}} rc := &Controller{cfg: cfg, tls: tls, store: mockStore, checkTemplate: template} - err := rc.ClusterResource(ctx) + var sourceSize int64 + err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { + sourceSize += size + return nil + }) + err = rc.ClusterResource(ctx, sourceSize) c.Assert(err, IsNil) c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCount) diff --git a/pkg/lightning/restore/table_restore.go b/pkg/lightning/restore/table_restore.go index 9692648268d49..ac53176adfd65 100644 --- a/pkg/lightning/restore/table_restore.go +++ b/pkg/lightning/restore/table_restore.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/br/pkg/lightning/mydump" verify "github.com/pingcap/br/pkg/lightning/verification" "github.com/pingcap/br/pkg/lightning/worker" + "github.com/pingcap/br/pkg/utils" ) type TableRestore struct { @@ -157,13 +158,22 @@ func (t *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDB // // The argument `columns` _must_ be in lower case. func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.ChunkCheckpoint) error { + colPerm, err := createColumnPermutation(columns, tr.ignoreColumns, tr.tableInfo.Core) + if err != nil { + return err + } + ccp.ColumnPermutation = colPerm + return nil +} + +func createColumnPermutation(columns []string, ignoreColumns []string, tableInfo *model.TableInfo) ([]int, error) { var colPerm []int if len(columns) == 0 { - colPerm = make([]int, 0, len(tr.tableInfo.Core.Columns)+1) - shouldIncludeRowID := common.TableHasAutoRowID(tr.tableInfo.Core) + colPerm = make([]int, 0, len(tableInfo.Columns)+1) + shouldIncludeRowID := common.TableHasAutoRowID(tableInfo) // no provided columns, so use identity permutation. - for i := range tr.tableInfo.Core.Columns { + for i := range tableInfo.Columns { colPerm = append(colPerm, i) } if shouldIncludeRowID { @@ -171,14 +181,12 @@ func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.Chu } } else { var err error - colPerm, err = parseColumnPermutations(tr.tableInfo.Core, columns, tr.ignoreColumns) + colPerm, err = parseColumnPermutations(tableInfo, columns, ignoreColumns) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } } - - ccp.ColumnPermutation = colPerm - return nil + return colPerm, nil } func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { @@ -282,26 +290,21 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp if engine.Status < checkpoints.CheckpointStatusImported { wg.Add(1) - // Note: We still need tableWorkers to control the concurrency of tables. - // In the future, we will investigate more about - // the difference between restoring tables concurrently and restoring tables one by one. + // If the number of chunks is small, it means that this engine may be finished in a few times. + // We do not limit it in TableConcurrency restoreWorker := rc.tableWorkers.Apply() - go func(w *worker.Worker, eid int32, ecp *checkpoints.EngineCheckpoint) { defer wg.Done() - engineLogTask := tr.logger.With(zap.Int32("engineNumber", eid)).Begin(zap.InfoLevel, "restore engine") dataClosedEngine, err := tr.restoreEngine(ctx, rc, indexEngine, eid, ecp) engineLogTask.End(zap.ErrorLevel, err) rc.tableWorkers.Recycle(w) - if err != nil { - setError(err) - return + if err == nil { + dataWorker := rc.closedEngineLimit.Apply() + defer rc.closedEngineLimit.Recycle(dataWorker) + err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp) } - - dataWorker := rc.closedEngineLimit.Apply() - defer rc.closedEngineLimit.Recycle(dataWorker) - if err := tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp); err != nil { + if err != nil { setError(err) } }(restoreWorker, engineID, engine) @@ -391,6 +394,11 @@ func (tr *TableRestore) restoreEngine( TableInfo: tr.tableInfo, Local: &backend.LocalEngineConfig{}, } + if !tr.tableMeta.IsRowOrdered { + dataEngineCfg.Local.Compact = true + dataEngineCfg.Local.CompactConcurrency = 4 + dataEngineCfg.Local.CompactThreshold = compactionUpperThreshold + } dataEngine, err := rc.backend.OpenEngine(ctx, dataEngineCfg, tr.tableName, engineID) if err != nil { return nil, errors.Trace(err) @@ -885,3 +893,40 @@ func (tr *TableRestore) analyzeTable(ctx context.Context, g glue.SQLExecutor) er task.End(zap.ErrorLevel, err) return err } + +// estimate SST files compression threshold by total row file size +// with a higher compression threshold, the compression time increases, but the iteration time decreases. +// Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, +// we set the upper bound to 32GB to avoid too long compression time. +// factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine). +func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) int64 { + totalRawFileSize := int64(0) + var lastFile string + for _, engineCp := range cp.Engines { + for _, chunk := range engineCp.Chunks { + if chunk.FileMeta.Path == lastFile { + continue + } + size := chunk.FileMeta.FileSize + if chunk.FileMeta.Type == mydump.SourceTypeParquet { + // parquet file is compressed, thus estimates with a factor of 2 + size *= 2 + } + totalRawFileSize += size + lastFile = chunk.FileMeta.Path + } + } + totalRawFileSize *= factor + + // try restrict the total file number within 512 + threshold := totalRawFileSize / 512 + threshold = utils.NextPowerOfTwo(threshold) + if threshold < compactionLowerThreshold { + // disable compaction if threshold is smaller than lower bound + threshold = 0 + } else if threshold > compactionUpperThreshold { + threshold = compactionUpperThreshold + } + + return threshold +} diff --git a/pkg/lightning/restore/tidb.go b/pkg/lightning/restore/tidb.go index 86db9cad0cfae..b7e88064a2b74 100644 --- a/pkg/lightning/restore/tidb.go +++ b/pkg/lightning/restore/tidb.go @@ -28,17 +28,15 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" - - "github.com/pingcap/br/pkg/lightning/glue" + "go.uber.org/zap" "github.com/pingcap/br/pkg/lightning/checkpoints" "github.com/pingcap/br/pkg/lightning/common" "github.com/pingcap/br/pkg/lightning/config" + "github.com/pingcap/br/pkg/lightning/glue" "github.com/pingcap/br/pkg/lightning/log" "github.com/pingcap/br/pkg/lightning/metric" "github.com/pingcap/br/pkg/lightning/mydump" - - "go.uber.org/zap" ) // defaultImportantVariables is used in ObtainImportantVariables to retrieve the system @@ -198,7 +196,7 @@ func createTableIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName s } var res strings.Builder - ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, &res) + ctx := format.NewRestoreCtx(format.DefaultRestoreFlags|format.RestoreTiDBSpecialComment, &res) retStmts := make([]string, 0, len(stmts)) for _, stmt := range stmts { diff --git a/pkg/lightning/restore/tidb_test.go b/pkg/lightning/restore/tidb_test.go index 6f891e2adc37d..d38c25e56da2f 100644 --- a/pkg/lightning/restore/tidb_test.go +++ b/pkg/lightning/restore/tidb_test.go @@ -102,6 +102,34 @@ func (s *tidbSuite) TestCreateTableIfNotExistsStmt(c *C) { []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) COMMENT 'CREATE TABLE');"}, ) + // test clustered index consistency + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) PRIMARY KEY CLUSTERED COMMENT 'CREATE TABLE');", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) PRIMARY KEY /*T![clustered_index] CLUSTERED */ COMMENT 'CREATE TABLE');"}, + ) + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) COMMENT 'CREATE TABLE', PRIMARY KEY (`bar`) NONCLUSTERED);", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) COMMENT 'CREATE TABLE',PRIMARY KEY(`bar`) /*T![clustered_index] NONCLUSTERED */);"}, + ) + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) PRIMARY KEY /*T![clustered_index] NONCLUSTERED */ COMMENT 'CREATE TABLE');", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) PRIMARY KEY /*T![clustered_index] NONCLUSTERED */ COMMENT 'CREATE TABLE');"}, + ) + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) COMMENT 'CREATE TABLE', PRIMARY KEY (`bar`) /*T![clustered_index] CLUSTERED */);", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) COMMENT 'CREATE TABLE',PRIMARY KEY(`bar`) /*T![clustered_index] CLUSTERED */);"}, + ) + + c.Assert( + createTableIfNotExistsStmt("CREATE TABLE `foo`(`bar` INT(1) PRIMARY KEY AUTO_RANDOM(2) COMMENT 'CREATE TABLE');", "foo"), + DeepEquals, + []string{"CREATE TABLE IF NOT EXISTS `testdb`.`foo` (`bar` INT(1) PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */ COMMENT 'CREATE TABLE');"}, + ) + // upper case becomes shorter c.Assert( createTableIfNotExistsStmt("CREATE TABLE `ſ`(`ı` TINYINT(1));", "ſ"), diff --git a/pkg/logutil/context.go b/pkg/logutil/context.go new file mode 100644 index 0000000000000..05fe7f0696b53 --- /dev/null +++ b/pkg/logutil/context.go @@ -0,0 +1,51 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package logutil + +import ( + "context" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// We cannot directly set global logger as log.L(), +// or when the global logger updated, we cannot get the latest logger. +var globalLogger *zap.Logger = nil + +// ResetGlobalLogger resets the global logger. +// Contexts have already made by `ContextWithField` would keep untouched, +// subsequent wrapping over those contexts would keep using the old global logger, +// only brand new contexts (i.e. context without logger) would be wrapped with the new global logger. +// This method is mainly for testing. +func ResetGlobalLogger(l *zap.Logger) { + globalLogger = l +} + +type loggingContextKey struct{} + +var keyLogger loggingContextKey = loggingContextKey{} + +// ContextWithField wrap a context with a logger with some fields. +func ContextWithField(c context.Context, fields ...zap.Field) context.Context { + logger := LoggerFromContext(c).With(fields...) + return context.WithValue(c, keyLogger, logger) +} + +// LoggerFromContext returns the contextual logger via the context. +// If there isn't a logger in the context, returns the global logger. +func LoggerFromContext(c context.Context) *zap.Logger { + logger, ok := c.Value(keyLogger).(*zap.Logger) + if !ok { + if globalLogger != nil { + return globalLogger + } + return log.L() + } + return logger +} + +// CL is the shorthand for LoggerFromContext. +func CL(c context.Context) *zap.Logger { + return LoggerFromContext(c) +} diff --git a/pkg/logutil/logging_test.go b/pkg/logutil/logging_test.go index 137a7a7ad40c2..fbd695dbb7005 100644 --- a/pkg/logutil/logging_test.go +++ b/pkg/logutil/logging_test.go @@ -3,6 +3,7 @@ package logutil_test import ( + "context" "fmt" "math" "strings" @@ -17,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/logutil" @@ -208,3 +210,50 @@ func (s *testLoggingSuite) TestShortError(c *C) { assertTrimEqual(c, logutil.ShortError(err), `{"error": "test: [BR:Common:ErrInvalidArgument]invalid argument"}`) } + +type FieldEquals struct{} + +func (f FieldEquals) Info() *CheckerInfo { + return &CheckerInfo{ + Name: "FieldEquals", + Params: []string{ + "expected", + "actual", + }, + } +} + +func (f FieldEquals) Check(params []interface{}, names []string) (result bool, err string) { + expected := params[0].(zap.Field) + actual := params[1].(zap.Field) + + if !expected.Equals(actual) { + return false, "Field not match." + } + return true, "" +} + +func (s *testLoggingSuite) TestContextual(c *C) { + testCore, logs := observer.New(zap.InfoLevel) + logutil.ResetGlobalLogger(zap.New(testCore)) + + ctx := context.Background() + l0 := logutil.LoggerFromContext(ctx) + l0.Info("going to take an adventure?", zap.Int("HP", 50), zap.Int("HP-MAX", 50), zap.String("character", "solte")) + lctx := logutil.ContextWithField(ctx, zap.Strings("firends", []string{"firo", "seren", "black"})) + l := logutil.LoggerFromContext(lctx) + l.Info("let's go!", zap.String("character", "solte")) + + observedLogs := logs.TakeAll() + checkLog(c, observedLogs[0], + "going to take an adventure?", zap.Int("HP", 50), zap.Int("HP-MAX", 50), zap.String("character", "solte")) + checkLog(c, observedLogs[1], + "let's go!", zap.Strings("firends", []string{"firo", "seren", "black"}), zap.String("character", "solte")) +} + +func checkLog(c *C, actual observer.LoggedEntry, message string, fields ...zap.Field) { + c.Assert(message, Equals, actual.Message) + for i, f := range fields { + c.Assert(f, FieldEquals{}, actual.Context[i]) + } +} diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 81a223605cd89..8e60400fb2c04 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" + pdapi "github.com/tikv/pd/server/api" "go.uber.org/zap" "google.golang.org/grpc" @@ -34,6 +35,7 @@ import ( const ( clusterVersionPrefix = "pd/api/v1/config/cluster-version" regionCountPrefix = "pd/api/v1/stats/region" + storePrefix = "pd/api/v1/store" schedulerPrefix = "pd/api/v1/schedulers" maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" @@ -336,6 +338,33 @@ func (p *PdController) getRegionCountWith( return 0, errors.Trace(err) } +// GetStoreInfo returns the info of store with the specified id. +func (p *PdController) GetStoreInfo(ctx context.Context, storeID uint64) (*pdapi.StoreInfo, error) { + return p.getStoreInfoWith(ctx, pdRequest, storeID) +} + +func (p *PdController) getStoreInfoWith( + ctx context.Context, get pdHTTPRequest, storeID uint64) (*pdapi.StoreInfo, error) { + var err error + for _, addr := range p.addrs { + query := fmt.Sprintf( + "%s/%d", + storePrefix, storeID) + v, e := get(ctx, addr, query, p.cli, http.MethodGet, nil) + if e != nil { + err = e + continue + } + store := pdapi.StoreInfo{} + err = json.Unmarshal(v, &store) + if err != nil { + return nil, errors.Trace(err) + } + return &store, nil + } + return nil, errors.Trace(err) +} + func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { // pause this scheduler with 300 seconds body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)}) diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index 76c2424e0dc3c..e4e82d412171c 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -18,6 +18,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/util/codec" + "github.com/tikv/pd/pkg/typeutil" + "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/statistics" ) @@ -200,3 +202,33 @@ func (s *testPDControllerSuite) TestPDRequestRetry(c *C) { _, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil) c.Assert(reqErr, NotNil) } + +func (s *testPDControllerSuite) TestStoreInfo(c *C) { + storeInfo := api.StoreInfo{ + Status: &api.StoreStatus{ + Capacity: typeutil.ByteSize(1024), + Available: typeutil.ByteSize(1024), + }, + Store: &api.MetaStore{ + StateName: "Tombstone", + }, + } + mock := func( + _ context.Context, addr string, prefix string, _ *http.Client, _ string, _ io.Reader, + ) ([]byte, error) { + query := fmt.Sprintf("%s/%s", addr, prefix) + c.Assert(query, Equals, "http://mock/pd/api/v1/store/1") + ret, err := json.Marshal(storeInfo) + c.Assert(err, IsNil) + return ret, nil + } + + pdController := &PdController{addrs: []string{"http://mock"}} + ctx := context.Background() + resp, err := pdController.getStoreInfoWith(ctx, mock, 1) + c.Assert(err, IsNil) + c.Assert(resp, NotNil) + c.Assert(resp.Status, NotNil) + c.Assert(resp.Store.StateName, Equals, "Tombstone") + c.Assert(uint64(resp.Status.Available), Equals, uint64(1024)) +}