diff --git a/pkg/txn/storage/tae/storage.go b/pkg/txn/storage/tae/storage.go index 68377fbac838d..e1b30fba6128f 100644 --- a/pkg/txn/storage/tae/storage.go +++ b/pkg/txn/storage/tae/storage.go @@ -54,7 +54,7 @@ func NewTAEStorage( } taeHandler := rpc.NewTAEHandle(ctx, dataDir, opt) tae := taeHandler.GetDB() - logtailer := logtail.NewLogtailer(ctx, tae.BGCheckpointRunner, tae.LogtailMgr, tae.Catalog) + logtailer := logtail.NewLogtailer(ctx, tae, tae.LogtailMgr, tae.Catalog) server, err := service.NewLogtailServer(logtailServerAddr, logtailServerCfg, logtailer, rt, nil) if err != nil { return nil, err diff --git a/pkg/vm/engine/tae/common/retry.go b/pkg/vm/engine/tae/common/retry.go index 3a59d89e040ad..b5d238290bbcc 100644 --- a/pkg/vm/engine/tae/common/retry.go +++ b/pkg/vm/engine/tae/common/retry.go @@ -27,7 +27,9 @@ type WaitOp = func() (ok bool, err error) func RetryWithIntervalAndTimeout( op WaitOp, timeout time.Duration, - interval time.Duration, suppressTimout bool) (err error) { + interval time.Duration, + suppressTimout bool, +) (err error) { ctx, cancel := context.WithTimeoutCause(context.Background(), timeout, moerr.CauseRetryWithIntervalAndTimeout) defer cancel() diff --git a/pkg/vm/engine/tae/db/checkpoint/flusher.go b/pkg/vm/engine/tae/db/checkpoint/flusher.go new file mode 100644 index 0000000000000..f4c3e1321d0ce --- /dev/null +++ b/pkg/vm/engine/tae/db/checkpoint/flusher.go @@ -0,0 +1,750 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "context" + "fmt" + "math/rand" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/util/fault" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" + "go.uber.org/zap" +) + +var ErrFlusherStopped = moerr.NewInternalErrorNoCtx("flusher stopped") + +type FlushCfg struct { + ForceFlushTimeout time.Duration + ForceFlushCheckInterval time.Duration + FlushInterval time.Duration + CronPeriod time.Duration +} + +type FlushMutableCfg struct { + ForceFlushTimeout time.Duration + ForceFlushCheckInterval time.Duration +} + +type Flusher interface { + IsAllChangesFlushed(start, end types.TS, doPrint bool) bool + FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error + ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) error + ForceFlushWithInterval(ts types.TS, ctx context.Context, forceDuration, flushInterval time.Duration) (err error) + ChangeForceFlushTimeout(timeout time.Duration) + ChangeForceCheckInterval(interval time.Duration) + GetCfg() FlushCfg + Restart(opts ...FlusherOption) + IsStopped() bool + Start() + Stop() +} + +var _ Flusher = (*flusher)(nil) + +type FlushRequest struct { + force bool + tree *logtail.DirtyTreeEntry +} + +type FlusherOption func(*flushImpl) + +func WithFlusherCronPeriod(period time.Duration) FlusherOption { + return func(flusher *flushImpl) { + flusher.cronPeriod = period + } +} + +func WithFlusherInterval(interval time.Duration) FlusherOption { + return func(flusher *flushImpl) { + flusher.flushInterval = interval + } +} + +func WithFlusherQueueSize(size int) FlusherOption { + return func(flusher *flushImpl) { + flusher.flushQueueSize = size + } +} + +func WithFlusherCfg(cfg FlushCfg) FlusherOption { + return func(flusher *flushImpl) { + WithFlusherInterval(cfg.FlushInterval)(flusher) + WithFlusherCronPeriod(cfg.CronPeriod)(flusher) + WithFlusherForceTimeout(cfg.ForceFlushTimeout)(flusher) + WithFlusherForceCheckInterval(cfg.ForceFlushCheckInterval)(flusher) + } +} + +func WithFlusherForceTimeout(timeout time.Duration) FlusherOption { + return func(flusher *flushImpl) { + for { + var newCfg FlushMutableCfg + oldCfg := flusher.mutableCfg.Load() + if oldCfg == nil { + newCfg.ForceFlushTimeout = timeout + if flusher.mutableCfg.CompareAndSwap(oldCfg, &newCfg) { + break + } + } else { + newCfg = *oldCfg + newCfg.ForceFlushTimeout = timeout + if flusher.mutableCfg.CompareAndSwap(oldCfg, &newCfg) { + break + } + } + } + } +} + +func WithFlusherForceCheckInterval(interval time.Duration) FlusherOption { + return func(flusher *flushImpl) { + for { + var newCfg FlushMutableCfg + oldCfg := flusher.mutableCfg.Load() + if oldCfg == nil { + newCfg.ForceFlushCheckInterval = interval + if flusher.mutableCfg.CompareAndSwap(oldCfg, &newCfg) { + break + } + } else { + newCfg = *oldCfg + newCfg.ForceFlushCheckInterval = interval + if flusher.mutableCfg.CompareAndSwap(oldCfg, &newCfg) { + break + } + } + } + } +} + +type flusher struct { + rt *dbutils.Runtime + catalogCache *catalog.Catalog + sourcer logtail.Collector + checkpointSchduler CheckpointScheduler + + impl atomic.Pointer[flushImpl] +} + +func NewFlusher( + rt *dbutils.Runtime, + checkpointSchduler CheckpointScheduler, + catalogCache *catalog.Catalog, + sourcer logtail.Collector, + opts ...FlusherOption, +) Flusher { + flusher := &flusher{ + rt: rt, + checkpointSchduler: checkpointSchduler, + catalogCache: catalogCache, + sourcer: sourcer, + } + flusher.impl.Store(newFlusherImpl(rt, checkpointSchduler, catalogCache, sourcer, opts...)) + return flusher +} + +func (f *flusher) IsStopped() bool { + return f.impl.Load() == nil +} + +func (f *flusher) IsAllChangesFlushed(start, end types.TS, doPrint bool) bool { + impl := f.impl.Load() + if impl == nil { + return false + } + return impl.IsAllChangesFlushed(start, end, doPrint) +} + +func (f *flusher) Restart(opts ...FlusherOption) { + newImpl := newFlusherImpl( + f.rt, + f.checkpointSchduler, + f.catalogCache, + f.sourcer, + opts..., + ) + + for { + oldImpl := f.impl.Load() + if f.impl.CompareAndSwap(oldImpl, newImpl) { + if oldImpl != nil { + oldImpl.Stop() + } + break + } + } + newImpl.Start() +} + +func (f *flusher) FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error { + impl := f.impl.Load() + if impl == nil { + return ErrFlusherStopped + } + return impl.FlushTable(ctx, dbID, tableID, ts) +} + +func (f *flusher) ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) error { + impl := f.impl.Load() + if impl == nil { + return ErrFlusherStopped + } + return impl.ForceFlush(ts, ctx, duration) +} + +func (f *flusher) ForceFlushWithInterval( + ts types.TS, ctx context.Context, forceDuration, flushInterval time.Duration, +) (err error) { + impl := f.impl.Load() + if impl == nil { + return ErrFlusherStopped + } + return impl.ForceFlushWithInterval(ts, ctx, forceDuration, flushInterval) +} + +func (f *flusher) ChangeForceFlushTimeout(timeout time.Duration) { + impl := f.impl.Load() + if impl == nil { + logutil.Warn("flusher stopped") + return + } + impl.ChangeForceFlushTimeout(timeout) +} + +func (f *flusher) ChangeForceCheckInterval(interval time.Duration) { + impl := f.impl.Load() + if impl == nil { + logutil.Warn("flusher stopped") + return + } + impl.ChangeForceCheckInterval(interval) +} + +func (f *flusher) GetCfg() FlushCfg { + impl := f.impl.Load() + if impl == nil { + return FlushCfg{} + } + return impl.GetCfg() +} + +func (f *flusher) Start() { + impl := f.impl.Load() + if impl == nil { + logutil.Warn("need restart") + return + } + impl.Start() +} + +func (f *flusher) Stop() { + impl := f.impl.Load() + if impl == nil { + return + } + if f.impl.CompareAndSwap(impl, nil) { + impl.Stop() + } +} + +type flushImpl struct { + mutableCfg atomic.Pointer[FlushMutableCfg] + flushInterval time.Duration + cronPeriod time.Duration + flushLag time.Duration + flushQueueSize int + + sourcer logtail.Collector + catalogCache *catalog.Catalog + checkpointSchduler CheckpointScheduler + rt *dbutils.Runtime + + cronTrigger *tasks.CancelableJob + flushRequestQ sm.Queue + + objMemSizeList []tableAndSize + + onceStart sync.Once + onceStop sync.Once +} + +func newFlusherImpl( + rt *dbutils.Runtime, + checkpointSchduler CheckpointScheduler, + catalogCache *catalog.Catalog, + sourcer logtail.Collector, + opts ...FlusherOption, +) *flushImpl { + flusher := &flushImpl{ + rt: rt, + checkpointSchduler: checkpointSchduler, + catalogCache: catalogCache, + sourcer: sourcer, + } + for _, opt := range opts { + opt(flusher) + } + + flusher.fillDefaults() + + flusher.flushRequestQ = sm.NewSafeQueue( + flusher.flushQueueSize, + 100, + flusher.onFlushRequest, + ) + + flusher.cronTrigger = tasks.NewCancelableCronJob( + "flusher", + flusher.cronPeriod, + flusher.triggerJob, + true, + 1, + ) + return flusher +} + +func (flusher *flushImpl) fillDefaults() { + if flusher.cronPeriod <= 0 { + flusher.cronPeriod = time.Second * 5 + } + cfg := flusher.mutableCfg.Load() + if cfg == nil { + cfg = new(FlushMutableCfg) + flusher.mutableCfg.Store(cfg) + } + if cfg.ForceFlushTimeout <= 0 { + cfg.ForceFlushTimeout = time.Second * 90 + } + if cfg.ForceFlushCheckInterval <= 0 { + cfg.ForceFlushCheckInterval = time.Millisecond * 500 + } + if flusher.flushInterval <= 0 { + flusher.flushInterval = time.Minute + } + // TODO: what is flushLag? Here just refactoring the original code. + if flusher.flushLag <= 0 { + if flusher.flushInterval < time.Second { + flusher.flushLag = 0 + } else { + flusher.flushLag = time.Second * 3 + } + } + + if flusher.flushQueueSize <= 0 { + flusher.flushQueueSize = 1000 + } +} + +func (flusher *flushImpl) triggerJob(ctx context.Context) { + flusher.sourcer.Run(flusher.flushLag) + entry := flusher.sourcer.GetAndRefreshMerged() + if !entry.IsEmpty() { + request := new(FlushRequest) + request.tree = entry + flusher.flushRequestQ.Enqueue(request) + } + _, endTS := entry.GetTimeRange() + flusher.checkpointSchduler.TryScheduleCheckpoint(endTS) +} + +func (flusher *flushImpl) onFlushRequest(items ...any) { + fromCrons := logtail.NewEmptyDirtyTreeEntry() + fromForce := logtail.NewEmptyDirtyTreeEntry() + for _, item := range items { + e := item.(*FlushRequest) + if e.force { + fromForce.Merge(e.tree) + } else { + fromCrons.Merge(e.tree) + } + } + flusher.scheduleFlush(fromForce, true) + flusher.scheduleFlush(fromCrons, false) +} + +func (flusher *flushImpl) scheduleFlush( + entry *logtail.DirtyTreeEntry, + force bool, +) { + if entry.IsEmpty() { + return + } + pressure := flusher.collectTableMemUsage(entry) + flusher.checkFlushConditionAndFire(entry, force, pressure) +} + +func (flusher *flushImpl) EstimateTableMemSize( + table *catalog.TableEntry, + tree *model.TableTree, +) (asize int, dsize int) { + for _, obj := range tree.Objs { + object, err := table.GetObjectByID(obj.ID, false) + if err != nil { + panic(err) + } + a, _ := object.GetObjectData().EstimateMemSize() + asize += a + } + for _, obj := range tree.Tombstones { + object, err := table.GetObjectByID(obj.ID, true) + if err != nil { + panic(err) + } + a, _ := object.GetObjectData().EstimateMemSize() + dsize += a + } + return +} + +func (flusher *flushImpl) collectTableMemUsage( + entry *logtail.DirtyTreeEntry, +) (memPressureRate float64) { + // reuse the list + flusher.objMemSizeList = flusher.objMemSizeList[:0] + sizevisitor := new(model.BaseTreeVisitor) + var totalSize int + sizevisitor.TableFn = func(did, tid uint64) error { + db, err := flusher.catalogCache.GetDatabaseByID(did) + if err != nil { + panic(err) + } + table, err := db.GetTableEntryByID(tid) + if err != nil { + panic(err) + } + table.Stats.Init(flusher.flushInterval) + dirtyTree := entry.GetTree().GetTable(tid) + asize, dsize := flusher.EstimateTableMemSize(table, dirtyTree) + totalSize += asize + dsize + flusher.objMemSizeList = append(flusher.objMemSizeList, tableAndSize{table, asize, dsize}) + return moerr.GetOkStopCurrRecur() + } + if err := entry.GetTree().Visit(sizevisitor); err != nil { + panic(err) + } + + slices.SortFunc(flusher.objMemSizeList, func(a, b tableAndSize) int { + return b.asize - a.asize // sort by asize desc + }) + + pressure := float64(totalSize) / float64(common.RuntimeOverallFlushMemCap.Load()) + if pressure > 1.0 { + pressure = 1.0 + } + logutil.Info( + "Flush-CollectMemUsage", + zap.Float64("pressure", pressure), + zap.String("size", common.HumanReadableBytes(totalSize)), + ) + + return pressure +} + +func (flusher *flushImpl) fireFlushTabletail( + table *catalog.TableEntry, + tree *model.TableTree, +) error { + tableDesc := fmt.Sprintf("%d-%s", table.ID, table.GetLastestSchemaLocked(false).Name) + metas := make([]*catalog.ObjectEntry, 0, 10) + for _, obj := range tree.Objs { + object, err := table.GetObjectByID(obj.ID, false) + if err != nil { + panic(err) + } + metas = append(metas, object) + } + tombstoneMetas := make([]*catalog.ObjectEntry, 0, 10) + for _, obj := range tree.Tombstones { + object, err := table.GetObjectByID(obj.ID, true) + if err != nil { + panic(err) + } + tombstoneMetas = append(tombstoneMetas, object) + } + + // freeze all append + scopes := make([]common.ID, 0, len(metas)) + for _, meta := range metas { + if !meta.GetObjectData().PrepareCompact() { + logutil.Info("[FlushTabletail] data prepareCompact false", zap.String("table", tableDesc), zap.String("obj", meta.ID().String())) + return moerr.GetOkExpectedEOB() + } + scopes = append(scopes, *meta.AsCommonID()) + } + for _, meta := range tombstoneMetas { + if !meta.GetObjectData().PrepareCompact() { + logutil.Info("[FlushTabletail] tomb prepareCompact false", zap.String("table", tableDesc), zap.String("obj", meta.ID().String())) + return moerr.GetOkExpectedEOB() + } + scopes = append(scopes, *meta.AsCommonID()) + } + + factory := jobs.FlushTableTailTaskFactory(metas, tombstoneMetas, flusher.rt) + if _, err := flusher.rt.Scheduler.ScheduleMultiScopedTxnTask(nil, tasks.FlushTableTailTask, scopes, factory); err != nil { + if err != tasks.ErrScheduleScopeConflict { + logutil.Error("[FlushTabletail] Sched Failure", zap.String("table", tableDesc), zap.Error(err)) + } + return moerr.GetOkExpectedEOB() + } + return nil +} + +func (flusher *flushImpl) checkFlushConditionAndFire( + entry *logtail.DirtyTreeEntry, force bool, pressure float64, +) { + count := 0 + for _, ticket := range flusher.objMemSizeList { + table, asize, dsize := ticket.tbl, ticket.asize, ticket.dsize + dirtyTree := entry.GetTree().GetTable(table.ID) + + if force { + logutil.Info( + "Flush-Force", + zap.Uint64("id", table.ID), + zap.String("name", table.GetLastestSchemaLocked(false).Name), + ) + if err := flusher.fireFlushTabletail(table, dirtyTree); err == nil { + table.Stats.ResetDeadline(flusher.flushInterval) + } + continue + } + + flushReady := func() bool { + if !table.IsActive() { + count++ + if pressure < 0.5 || count < 200 { + // if the table has been dropped, flush it immediately if + // resources are available. + // count is used to avoid too many flushes in one round + return true + } + return false + } + // time to flush + if table.Stats.GetFlushDeadline().Before(time.Now()) { + return true + } + // this table is too large, flush it + if asize+dsize > int(common.FlushMemCapacity.Load()) { + return true + } + // unflushed data is too large, flush it + if asize > common.Const1MBytes && rand.Float64() < pressure { + return true + } + return false + } + + ready := flushReady() + + if asize+dsize > 2*1000*1024 { + logutil.Info( + "Flush-Tabletail", + zap.String("name", table.GetLastestSchemaLocked(false).Name), + zap.String("size", common.HumanReadableBytes(asize+dsize)), + zap.String("dsize", common.HumanReadableBytes(dsize)), + zap.Duration("count-down", time.Until(table.Stats.GetFlushDeadline())), + zap.Bool("ready", ready), + ) + } + + if ready { + if err := flusher.fireFlushTabletail(table, dirtyTree); err == nil { + table.Stats.ResetDeadline(flusher.flushInterval) + } + } + } +} + +func (flusher *flushImpl) ChangeForceFlushTimeout(timeout time.Duration) { + WithFlusherForceTimeout(timeout)(flusher) +} + +func (flusher *flushImpl) ChangeForceCheckInterval(interval time.Duration) { + WithFlusherForceCheckInterval(interval)(flusher) +} + +func (flusher *flushImpl) ForceFlush( + ts types.TS, ctx context.Context, forceDuration time.Duration, +) (err error) { + return flusher.ForceFlushWithInterval( + ts, ctx, forceDuration, 0, + ) +} + +func (flusher *flushImpl) ForceFlushWithInterval( + ts types.TS, ctx context.Context, forceDuration, flushInterval time.Duration, +) (err error) { + makeRequest := func() *FlushRequest { + tree := flusher.sourcer.ScanInRangePruned(types.TS{}, ts) + tree.GetTree().Compact() + if tree.IsEmpty() { + return nil + } + entry := logtail.NewDirtyTreeEntry(types.TS{}, ts, tree.GetTree()) + request := new(FlushRequest) + request.tree = entry + request.force = true + // logutil.Infof("try flush %v",tree.String()) + return request + } + op := func() (ok bool, err error) { + request := makeRequest() + if request == nil { + return true, nil + } + if _, err = flusher.flushRequestQ.Enqueue(request); err != nil { + return true, nil + } + return false, nil + } + + cfg := flusher.mutableCfg.Load() + + if forceDuration <= 0 { + forceDuration = cfg.ForceFlushTimeout + } + if flushInterval <= 0 { + flushInterval = cfg.ForceFlushCheckInterval + } + if err = common.RetryWithIntervalAndTimeout( + op, + forceDuration, + flushInterval, + false, + ); err != nil { + return moerr.NewInternalErrorf(ctx, "force flush failed: %v", err) + } + _, sarg, _ := fault.TriggerFault(objectio.FJ_FlushTimeout) + if sarg != "" { + err = moerr.NewInternalError(ctx, sarg) + } + return + +} + +func (flusher *flushImpl) GetCfg() FlushCfg { + var cfg FlushCfg + mCfg := flusher.mutableCfg.Load() + cfg.ForceFlushTimeout = mCfg.ForceFlushTimeout + cfg.ForceFlushCheckInterval = mCfg.ForceFlushCheckInterval + cfg.FlushInterval = flusher.flushInterval + cfg.CronPeriod = flusher.cronPeriod + return cfg +} + +func (flusher *flushImpl) FlushTable( + ctx context.Context, dbID, tableID uint64, ts types.TS, +) (err error) { + iarg, sarg, flush := fault.TriggerFault("flush_table_error") + if flush && (iarg == 0 || rand.Int63n(iarg) == 0) { + return moerr.NewInternalError(ctx, sarg) + } + makeRequest := func() *FlushRequest { + tree := flusher.sourcer.ScanInRangePruned(types.TS{}, ts) + tree.GetTree().Compact() + tableTree := tree.GetTree().GetTable(tableID) + if tableTree == nil { + return nil + } + nTree := model.NewTree() + nTree.Tables[tableID] = tableTree + entry := logtail.NewDirtyTreeEntry(types.TS{}, ts, nTree) + request := new(FlushRequest) + request.tree = entry + request.force = true + return request + } + + op := func() (ok bool, err error) { + request := makeRequest() + if request == nil { + return true, nil + } + if _, err = flusher.flushRequestQ.Enqueue(request); err != nil { + // TODO: why (true,nil)??? + return true, nil + } + return false, nil + } + + cfg := flusher.mutableCfg.Load() + + err = common.RetryWithIntervalAndTimeout( + op, + cfg.ForceFlushTimeout, + cfg.ForceFlushCheckInterval, + true, + ) + if moerr.IsMoErrCode(err, moerr.ErrInternal) || moerr.IsMoErrCode(err, moerr.OkExpectedEOB) { + logutil.Warnf("Flush %d-%d :%v", dbID, tableID, err) + return nil + } + return +} + +func (flusher *flushImpl) IsAllChangesFlushed( + start, end types.TS, doPrint bool, +) bool { + tree := flusher.sourcer.ScanInRangePruned(start, end) + tree.GetTree().Compact() + if doPrint && !tree.IsEmpty() { + logutil.Info( + "IsAllChangesFlushed", + zap.String("dirty-tree", tree.String()), + ) + } + return tree.IsEmpty() +} + +func (flusher *flushImpl) Start() { + flusher.onceStart.Do(func() { + flusher.flushRequestQ.Start() + flusher.cronTrigger.Start() + cfg := flusher.mutableCfg.Load() + logutil.Info( + "flushImpl-Started", + zap.Duration("cron-period", flusher.cronPeriod), + zap.Duration("flush-interval", flusher.flushInterval), + zap.Duration("flush-lag", flusher.flushLag), + zap.Duration("force-flush-timeout", cfg.ForceFlushTimeout), + zap.Duration("force-flush-check-interval", cfg.ForceFlushCheckInterval), + ) + }) +} + +func (flusher *flushImpl) Stop() { + flusher.onceStop.Do(func() { + flusher.cronTrigger.Stop() + flusher.flushRequestQ.Stop() + logutil.Info("flushImpl-Stopped") + }) +} diff --git a/pkg/vm/engine/tae/db/checkpoint/flusher_test.go b/pkg/vm/engine/tae/db/checkpoint/flusher_test.go new file mode 100644 index 0000000000000..439b03db020a4 --- /dev/null +++ b/pkg/vm/engine/tae/db/checkpoint/flusher_test.go @@ -0,0 +1,62 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "context" + "testing" + "time" + + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/stretchr/testify/assert" +) + +func Test_RestartFlusher(t *testing.T) { + var cfg FlushCfg + cfg.ForceFlushTimeout = time.Millisecond * 7 + cfg.ForceFlushCheckInterval = time.Millisecond * 9 + cfg.FlushInterval = time.Millisecond * 11 + cfg.CronPeriod = time.Millisecond * 2 + f := NewFlusher( + nil, nil, nil, nil, + WithFlusherInterval(cfg.FlushInterval), + WithFlusherCronPeriod(cfg.CronPeriod), + WithFlusherForceTimeout(cfg.ForceFlushTimeout), + WithFlusherForceCheckInterval(cfg.ForceFlushCheckInterval), + ) + + fCfg := f.GetCfg() + assert.Equal(t, cfg, fCfg) + assert.False(t, f.IsStopped()) + + f.Stop() + assert.True(t, f.IsStopped()) + + ctx := context.Background() + var ts types.TS + + assert.Equal(t, ErrFlusherStopped, f.FlushTable(ctx, 0, 0, ts)) + assert.Equal(t, ErrFlusherStopped, f.ForceFlush(ts, ctx, time.Millisecond)) + assert.Equal(t, ErrFlusherStopped, f.ForceFlushWithInterval(ts, ctx, time.Millisecond, time.Millisecond)) + f.ChangeForceCheckInterval(time.Millisecond) + f.ChangeForceFlushTimeout(time.Millisecond) + f.Start() + assert.True(t, f.IsStopped()) + + f.Restart(WithFlusherCfg(cfg)) + assert.False(t, f.IsStopped()) + fCfg = f.GetCfg() + assert.Equal(t, cfg, fCfg) +} diff --git a/pkg/vm/engine/tae/db/checkpoint/option.go b/pkg/vm/engine/tae/db/checkpoint/option.go index b54e8936f06d5..7928b36a6a42f 100644 --- a/pkg/vm/engine/tae/db/checkpoint/option.go +++ b/pkg/vm/engine/tae/db/checkpoint/option.go @@ -54,18 +54,6 @@ func WithGlobalMinCount(count int) Option { } } -func WithForceFlushTimeout(to time.Duration) Option { - return func(r *runner) { - r.options.forceFlushTimeout = to - } -} - -func WithForceFlushCheckInterval(interval time.Duration) Option { - return func(r *runner) { - r.options.forceFlushCheckInterval = interval - } -} - func WithGlobalVersionInterval(interval time.Duration) Option { return func(r *runner) { r.options.globalVersionInterval = interval diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index d9e588520f5b0..e7754cd6f3772 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -18,8 +18,6 @@ import ( "bytes" "context" "fmt" - "math/rand" - "slices" "strconv" "strings" "sync" @@ -31,27 +29,19 @@ import ( v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "go.uber.org/zap" - "github.com/matrixorigin/matrixone/pkg/util/fault" - "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/common/stopper" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" - w "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" "github.com/tidwall/btree" ) @@ -185,11 +175,6 @@ type runner struct { // minimum count of uncheckpointed transactions allowed before the next checkpoint minCount int - forceFlushTimeout time.Duration - forceFlushCheckInterval time.Duration - - dirtyEntryQueueSize int - waitQueueSize int checkpointQueueSize int checkpointBlockRows int @@ -208,8 +193,6 @@ type runner struct { wal wal.Driver disabled atomic.Bool - stopper *stopper.Stopper - // memory storage of the checkpoint entries storage struct { sync.RWMutex @@ -224,15 +207,11 @@ type runner struct { incrementalPolicy *timeBasedPolicy globalPolicy *countBasedPolicy - dirtyEntryQueue sm.Queue - waitQueue sm.Queue incrementalCheckpointQueue sm.Queue globalCheckpointQueue sm.Queue postCheckpointQueue sm.Queue gcCheckpointQueue sm.Queue - objMemSizeList []tableAndSize - checkpointMetaFiles struct { sync.RWMutex files map[string]struct{} @@ -274,14 +253,12 @@ func NewRunner( r.incrementalPolicy = &timeBasedPolicy{interval: r.options.minIncrementalInterval} r.globalPolicy = &countBasedPolicy{minCount: r.options.globalMinCount} - r.stopper = stopper.NewStopper("CheckpointRunner") - r.dirtyEntryQueue = sm.NewSafeQueue(r.options.dirtyEntryQueueSize, 100, r.onDirtyEntries) - r.waitQueue = sm.NewSafeQueue(r.options.waitQueueSize, 100, r.onWaitWaitableItems) r.incrementalCheckpointQueue = sm.NewSafeQueue(r.options.checkpointQueueSize, 100, r.onIncrementalCheckpointEntries) r.globalCheckpointQueue = sm.NewSafeQueue(r.options.checkpointQueueSize, 100, r.onGlobalCheckpointEntries) r.gcCheckpointQueue = sm.NewSafeQueue(100, 100, r.onGCCheckpointEntries) r.postCheckpointQueue = sm.NewSafeQueue(1000, 1, r.onPostCheckpointEntries) r.checkpointMetaFiles.files = make(map[string]struct{}) + return r } @@ -294,10 +271,6 @@ func (r *runner) String() string { _, _ = fmt.Fprintf(&buf, "globalMinCount=%v, ", r.options.globalMinCount) _, _ = fmt.Fprintf(&buf, "globalVersionInterval=%v, ", r.options.globalVersionInterval) _, _ = fmt.Fprintf(&buf, "minCount=%v, ", r.options.minCount) - _, _ = fmt.Fprintf(&buf, "forceFlushTimeout=%v, ", r.options.forceFlushTimeout) - _, _ = fmt.Fprintf(&buf, "forceFlushCheckInterval=%v, ", r.options.forceFlushCheckInterval) - _, _ = fmt.Fprintf(&buf, "dirtyEntryQueueSize=%v, ", r.options.dirtyEntryQueueSize) - _, _ = fmt.Fprintf(&buf, "waitQueueSize=%v, ", r.options.waitQueueSize) _, _ = fmt.Fprintf(&buf, "checkpointQueueSize=%v, ", r.options.checkpointQueueSize) _, _ = fmt.Fprintf(&buf, "checkpointBlockRows=%v, ", r.options.checkpointBlockRows) _, _ = fmt.Fprintf(&buf, "checkpointSize=%v, ", r.options.checkpointSize) @@ -331,13 +304,6 @@ func (r *runner) GetCheckpointMetaFiles() map[string]struct{} { return files } -// Only used in UT -func (r *runner) DebugUpdateOptions(opts ...Option) { - for _, opt := range opts { - opt(r) - } -} - func (r *runner) onGlobalCheckpointEntries(items ...any) { maxEnd := types.TS{} for _, item := range items { @@ -527,49 +493,6 @@ func (r *runner) DeleteGlobalEntry(entry *CheckpointEntry) { }) } -func (r *runner) FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) (err error) { - iarg, sarg, flush := fault.TriggerFault("flush_table_error") - if flush && (iarg == 0 || rand.Int63n(iarg) == 0) { - return moerr.NewInternalError(ctx, sarg) - } - makeCtx := func() *DirtyCtx { - tree := r.source.ScanInRangePruned(types.TS{}, ts) - tree.GetTree().Compact() - tableTree := tree.GetTree().GetTable(tableID) - if tableTree == nil { - return nil - } - nTree := model.NewTree() - nTree.Tables[tableID] = tableTree - entry := logtail.NewDirtyTreeEntry(types.TS{}, ts, nTree) - dirtyCtx := new(DirtyCtx) - dirtyCtx.tree = entry - dirtyCtx.force = true - return dirtyCtx - } - - op := func() (ok bool, err error) { - dirtyCtx := makeCtx() - if dirtyCtx == nil { - return true, nil - } - if _, err = r.dirtyEntryQueue.Enqueue(dirtyCtx); err != nil { - return true, nil - } - return false, nil - } - - err = common.RetryWithIntervalAndTimeout( - op, - r.options.forceFlushTimeout, - r.options.forceFlushCheckInterval, true) - if moerr.IsMoErrCode(err, moerr.ErrInternal) || moerr.IsMoErrCode(err, moerr.OkExpectedEOB) { - logutil.Warnf("Flush %d-%d :%v", dbID, tableID, err) - return nil - } - return -} - func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64) (name string, err error) { bat := r.collectCheckpointMetadata(start, end, ckpLSN, truncateLSN) defer bat.Close() @@ -813,7 +736,7 @@ func (r *runner) tryScheduleIncrementalCheckpoint(start, end types.TS) { r.tryAddNewIncrementalCheckpointEntry(entry) } -func (r *runner) tryScheduleCheckpoint(endts types.TS) { +func (r *runner) TryScheduleCheckpoint(endts types.TS) { if r.disabled.Load() { return } @@ -870,22 +793,10 @@ func (r *runner) tryScheduleCheckpoint(endts types.TS) { } func (r *runner) fillDefaults() { - if r.options.forceFlushTimeout <= 0 { - r.options.forceFlushTimeout = time.Second * 90 - } - if r.options.forceFlushCheckInterval <= 0 { - r.options.forceFlushCheckInterval = time.Millisecond * 400 - } if r.options.collectInterval <= 0 { // TODO: define default value r.options.collectInterval = time.Second * 5 } - if r.options.dirtyEntryQueueSize <= 0 { - r.options.dirtyEntryQueueSize = 10000 - } - if r.options.waitQueueSize <= 1000 { - r.options.waitQueueSize = 1000 - } if r.options.checkpointQueueSize <= 1000 { r.options.checkpointQueueSize = 1000 } @@ -906,270 +817,21 @@ func (r *runner) fillDefaults() { } } -func (r *runner) onWaitWaitableItems(items ...any) { - // TODO: change for more waitable items - start := time.Now() - for _, item := range items { - ckpEntry := item.(wal.LogEntry) - err := ckpEntry.WaitDone() - if err != nil { - panic(err) - } - ckpEntry.Free() - } - logutil.Debugf("Total [%d] WAL Checkpointed | [%s]", len(items), time.Since(start)) -} - -func (r *runner) fireFlushTabletail(table *catalog.TableEntry, tree *model.TableTree) error { - tableDesc := fmt.Sprintf("%d-%s", table.ID, table.GetLastestSchemaLocked(false).Name) - metas := make([]*catalog.ObjectEntry, 0, 10) - for _, obj := range tree.Objs { - object, err := table.GetObjectByID(obj.ID, false) - if err != nil { - panic(err) - } - metas = append(metas, object) - } - tombstoneMetas := make([]*catalog.ObjectEntry, 0, 10) - for _, obj := range tree.Tombstones { - object, err := table.GetObjectByID(obj.ID, true) - if err != nil { - panic(err) - } - tombstoneMetas = append(tombstoneMetas, object) - } - - // freeze all append - scopes := make([]common.ID, 0, len(metas)) - for _, meta := range metas { - if !meta.GetObjectData().PrepareCompact() { - logutil.Info("[FlushTabletail] data prepareCompact false", zap.String("table", tableDesc), zap.String("obj", meta.ID().String())) - return moerr.GetOkExpectedEOB() - } - scopes = append(scopes, *meta.AsCommonID()) - } - for _, meta := range tombstoneMetas { - if !meta.GetObjectData().PrepareCompact() { - logutil.Info("[FlushTabletail] tomb prepareCompact false", zap.String("table", tableDesc), zap.String("obj", meta.ID().String())) - return moerr.GetOkExpectedEOB() - } - scopes = append(scopes, *meta.AsCommonID()) - } - - factory := jobs.FlushTableTailTaskFactory(metas, tombstoneMetas, r.rt) - if _, err := r.rt.Scheduler.ScheduleMultiScopedTxnTask(nil, tasks.FlushTableTailTask, scopes, factory); err != nil { - if err != tasks.ErrScheduleScopeConflict { - logutil.Error("[FlushTabletail] Sched Failure", zap.String("table", tableDesc), zap.Error(err)) - } - return moerr.GetOkExpectedEOB() - } - return nil -} - -func (r *runner) EstimateTableMemSize(table *catalog.TableEntry, tree *model.TableTree) (asize int, dsize int) { - for _, obj := range tree.Objs { - object, err := table.GetObjectByID(obj.ID, false) - if err != nil { - panic(err) - } - a, _ := object.GetObjectData().EstimateMemSize() - asize += a - } - for _, obj := range tree.Tombstones { - object, err := table.GetObjectByID(obj.ID, true) - if err != nil { - panic(err) - } - a, _ := object.GetObjectData().EstimateMemSize() - dsize += a - } - return -} - -func (r *runner) collectTableMemUsage(entry *logtail.DirtyTreeEntry) (memPressureRate float64) { - // reuse the list - r.objMemSizeList = r.objMemSizeList[:0] - sizevisitor := new(model.BaseTreeVisitor) - var totalSize int - sizevisitor.TableFn = func(did, tid uint64) error { - db, err := r.catalog.GetDatabaseByID(did) - if err != nil { - panic(err) - } - table, err := db.GetTableEntryByID(tid) - if err != nil { - panic(err) - } - table.Stats.Init(r.options.maxFlushInterval) - dirtyTree := entry.GetTree().GetTable(tid) - asize, dsize := r.EstimateTableMemSize(table, dirtyTree) - totalSize += asize + dsize - r.objMemSizeList = append(r.objMemSizeList, tableAndSize{table, asize, dsize}) - return moerr.GetOkStopCurrRecur() - } - if err := entry.GetTree().Visit(sizevisitor); err != nil { - panic(err) - } - - slices.SortFunc(r.objMemSizeList, func(a, b tableAndSize) int { - return b.asize - a.asize // sort by asize desc - }) - - pressure := float64(totalSize) / float64(common.RuntimeOverallFlushMemCap.Load()) - if pressure > 1.0 { - pressure = 1.0 - } - logutil.Info( - "Flush-CollectMemUsage", - zap.Float64("pressure", pressure), - zap.String("size", common.HumanReadableBytes(totalSize)), - ) - - return pressure -} - -func (r *runner) checkFlushConditionAndFire(entry *logtail.DirtyTreeEntry, force bool, pressure float64) { - count := 0 - for _, ticket := range r.objMemSizeList { - table, asize, dsize := ticket.tbl, ticket.asize, ticket.dsize - dirtyTree := entry.GetTree().GetTable(table.ID) - - if force { - logutil.Info( - "Flush-Force", - zap.Uint64("id", table.ID), - zap.String("name", table.GetLastestSchemaLocked(false).Name), - ) - if err := r.fireFlushTabletail(table, dirtyTree); err == nil { - table.Stats.ResetDeadline(r.options.maxFlushInterval) - } - continue - } - - flushReady := func() bool { - if !table.IsActive() { - count++ - if pressure < 0.5 || count < 200 { - // if the table has been dropped, flush it immediately if - // resources are available. - // count is used to avoid too many flushes in one round - return true - } - return false - } - // time to flush - if table.Stats.GetFlushDeadline().Before(time.Now()) { - return true - } - // this table is too large, flush it - if asize+dsize > int(common.FlushMemCapacity.Load()) { - return true - } - // unflushed data is too large, flush it - if asize > common.Const1MBytes && rand.Float64() < pressure { - return true - } - return false - } - - ready := flushReady() - - if asize+dsize > 2*1000*1024 { - logutil.Info( - "Flush-Tabletail", - zap.String("name", table.GetLastestSchemaLocked(false).Name), - zap.String("size", common.HumanReadableBytes(asize+dsize)), - zap.String("dsize", common.HumanReadableBytes(dsize)), - zap.Duration("count-down", time.Until(table.Stats.GetFlushDeadline())), - zap.Bool("ready", ready), - ) - } - - if ready { - if err := r.fireFlushTabletail(table, dirtyTree); err == nil { - table.Stats.ResetDeadline(r.options.maxFlushInterval) - } - } - } -} - -// for a non-forced dirty tree, it contains all unflushed data in the db at the latest moment -func (r *runner) scheduleFlush(entry *logtail.DirtyTreeEntry, force bool) { - if entry.IsEmpty() { - return - } - // logutil.Debug(entry.String()) - - pressure := r.collectTableMemUsage(entry) - r.checkFlushConditionAndFire(entry, force, pressure) -} - -func (r *runner) onDirtyEntries(entries ...any) { - normal := logtail.NewEmptyDirtyTreeEntry() - force := logtail.NewEmptyDirtyTreeEntry() - for _, entry := range entries { - e := entry.(*DirtyCtx) - if e.force { - force.Merge(e.tree) - } else { - normal.Merge(e.tree) - } - } - r.scheduleFlush(force, true) - r.scheduleFlush(normal, false) -} - -func (r *runner) crontask(ctx context.Context) { - // friendly for freezing objects, avoiding fierece refer cnt compectition - lag := 3 * time.Second - if r.options.maxFlushInterval < time.Second { - // test env, no need to lag - lag = 0 * time.Second - } - hb := w.NewHeartBeaterWithFunc(r.options.collectInterval, func() { - r.source.Run(lag) - entry := r.source.GetAndRefreshMerged() - if !entry.IsEmpty() { - e := new(DirtyCtx) - e.tree = entry - r.dirtyEntryQueue.Enqueue(e) - } - _, endts := entry.GetTimeRange() - r.tryScheduleCheckpoint(endts) - }, nil) - hb.Start() - <-ctx.Done() - hb.Stop() -} - -func (r *runner) EnqueueWait(item any) (err error) { - _, err = r.waitQueue.Enqueue(item) - return -} - func (r *runner) Start() { r.onceStart.Do(func() { r.postCheckpointQueue.Start() r.incrementalCheckpointQueue.Start() r.globalCheckpointQueue.Start() r.gcCheckpointQueue.Start() - r.dirtyEntryQueue.Start() - r.waitQueue.Start() - if err := r.stopper.RunNamedTask("dirty-collector-job", r.crontask); err != nil { - panic(err) - } }) } func (r *runner) Stop() { r.onceStop.Do(func() { - r.stopper.Stop() - r.dirtyEntryQueue.Stop() r.incrementalCheckpointQueue.Stop() r.globalCheckpointQueue.Stop() r.gcCheckpointQueue.Stop() r.postCheckpointQueue.Stop() - r.waitQueue.Stop() }) } diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 774219ebfce68..b6bd7cfaacfaf 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -21,8 +21,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/logutil" - "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" @@ -39,14 +37,11 @@ type TestRunner interface { ForceGlobalCheckpointSynchronously(ctx context.Context, end types.TS, versionInterval time.Duration) error ForceCheckpointForBackup(end types.TS) (string, error) ForceIncrementalCheckpoint(end types.TS, truncate bool) error - IsAllChangesFlushed(start, end types.TS, printTree bool) bool MaxLSNInRange(end types.TS) uint64 ExistPendingEntryToGC() bool MaxGlobalCheckpoint() *CheckpointEntry MaxIncrementalCheckpoint() *CheckpointEntry - ForceFlush(ts types.TS, ctx context.Context, duration time.Duration) (err error) - ForceFlushWithInterval(ts types.TS, ctx context.Context, forceDuration, flushInterval time.Duration) (err error) GetDirtyCollector() logtail.Collector } @@ -158,59 +153,15 @@ func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end typ err := common.RetryWithIntervalAndTimeout( op, time.Minute, - r.options.forceFlushCheckInterval, false) + time.Millisecond*400, + false, + ) if err != nil { return moerr.NewInternalErrorf(ctx, "force global checkpoint failed: %v", err) } return nil } -func (r *runner) ForceFlushWithInterval(ts types.TS, ctx context.Context, forceDuration, flushInterval time.Duration) (err error) { - makeCtx := func() *DirtyCtx { - tree := r.source.ScanInRangePruned(types.TS{}, ts) - tree.GetTree().Compact() - if tree.IsEmpty() { - return nil - } - entry := logtail.NewDirtyTreeEntry(types.TS{}, ts, tree.GetTree()) - dirtyCtx := new(DirtyCtx) - dirtyCtx.tree = entry - dirtyCtx.force = true - // logutil.Infof("try flush %v",tree.String()) - return dirtyCtx - } - op := func() (ok bool, err error) { - dirtyCtx := makeCtx() - if dirtyCtx == nil { - return true, nil - } - if _, err = r.dirtyEntryQueue.Enqueue(dirtyCtx); err != nil { - return true, nil - } - return false, nil - } - - if forceDuration == 0 { - forceDuration = r.options.forceFlushTimeout - } - err = common.RetryWithIntervalAndTimeout( - op, - forceDuration, - flushInterval, false) - if err != nil { - return moerr.NewInternalErrorf(ctx, "force flush failed: %v", err) - } - _, sarg, _ := fault.TriggerFault(objectio.FJ_FlushTimeout) - if sarg != "" { - err = moerr.NewInternalError(ctx, sarg) - } - return - -} -func (r *runner) ForceFlush(ts types.TS, ctx context.Context, forceDuration time.Duration) (err error) { - return r.ForceFlushWithInterval(ts, ctx, forceDuration, r.options.forceFlushCheckInterval) -} - func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error { now := time.Now() prev := r.MaxIncrementalCheckpoint() @@ -364,12 +315,3 @@ func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err er logutil.Infof("checkpoint for backup %s, takes %s", entry.String(), time.Since(now)) return location, nil } - -func (r *runner) IsAllChangesFlushed(start, end types.TS, printTree bool) bool { - tree := r.source.ScanInRangePruned(start, end) - tree.GetTree().Compact() - if printTree && !tree.IsEmpty() { - logutil.Infof("%v", tree.String()) - } - return tree.IsEmpty() -} diff --git a/pkg/vm/engine/tae/db/checkpoint/types.go b/pkg/vm/engine/tae/db/checkpoint/types.go index 49bf3f3412bb6..992695ab42cb8 100644 --- a/pkg/vm/engine/tae/db/checkpoint/types.go +++ b/pkg/vm/engine/tae/db/checkpoint/types.go @@ -21,7 +21,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) type State int8 @@ -41,29 +40,26 @@ const ( ET_Compacted ) +type CheckpointScheduler interface { + TryScheduleCheckpoint(types.TS) +} + type Runner interface { + CheckpointScheduler TestRunner RunnerReader Start() Stop() String() string - EnqueueWait(any) error Replay(catalog.DataFactory) *CkpReplayer - FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error GCByTS(ctx context.Context, ts types.TS) error // for test, delete in next phase - DebugUpdateOptions(opts ...Option) GetAllCheckpoints() []*CheckpointEntry GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry } -type DirtyCtx struct { - force bool - tree *logtail.DirtyTreeEntry -} - type Observer interface { OnNewCheckpoint(ts types.TS) } diff --git a/pkg/vm/engine/tae/db/controller.go b/pkg/vm/engine/tae/db/controller.go index f8bedb80ecc42..db0580d88aeb6 100644 --- a/pkg/vm/engine/tae/db/controller.go +++ b/pkg/vm/engine/tae/db/controller.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" "go.uber.org/zap" ) @@ -35,6 +36,70 @@ const ( ControlCmd_ToWriteMode ) +type stepFunc struct { + fn func() error + desc string +} + +type stepFuncs []stepFunc + +func (s stepFuncs) Apply(msg string, reversed bool, logLevel int) (err error) { + now := time.Now() + defer func() { + if logLevel > 0 { + logutil.Info( + msg, + zap.String("step", "all"), + zap.Duration("duration", time.Since(now)), + zap.Error(err), + ) + } + }() + + if reversed { + for i := len(s) - 1; i >= 0; i-- { + start := time.Now() + if err = s[i].fn(); err != nil { + logutil.Error( + msg, + zap.String("step", s[i].desc), + zap.Error(err), + ) + return + } + if logLevel > 1 { + logutil.Info( + msg, + zap.String("step", s[i].desc), + zap.Duration("duration", time.Since(start)), + ) + } + } + } else { + for i := 0; i < len(s); i++ { + if err = s[i].fn(); err != nil { + logutil.Error( + msg, + zap.String("step", s[i].desc), + zap.Error(err), + ) + return + } + if logLevel > 1 { + logutil.Info( + msg, + zap.String("step", s[i].desc), + ) + } + } + } + return nil +} + +func (s stepFuncs) Push(ss ...stepFunc) stepFuncs { + return append(s, ss...) +} + func newControlCmd( ctx context.Context, typ ControlCmdType, @@ -118,8 +183,9 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { // TODO: error handling var ( - err error - start time.Time = time.Now() + err error + start time.Time = time.Now() + rollbackSteps stepFuncs ) ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute) @@ -132,13 +198,21 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { ) defer func() { - if err != nil { + err2 := err + if err2 != nil { + err = rollbackSteps.Apply("DB-SwitchToReplay-Rollback", true, 1) + } + if err2 != nil { logger = logutil.Error } + if err != nil { + logger = logutil.Fatal + } logger( "DB-SwitchToReplay-Done", zap.String("cmd", cmd.String()), zap.Duration("duration", time.Since(start)), + zap.Any("rollback-error", err2), zap.Error(err), ) cmd.setError(err) @@ -157,6 +231,19 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { return } // 2.x TODO: checkpoint runner + flushCfg := c.db.BGFlusher.GetCfg() + c.db.BGFlusher.Stop() + rollbackSteps.Push( + struct { + fn func() error + desc string + }{ + fn: func() error { + c.db.BGFlusher.Restart(checkpoint.WithFlusherCfg(flushCfg)) + return nil + }, + desc: "start bg flusher", + }) // 3. build forward write request tunnel to the new write candidate // TODO @@ -210,8 +297,9 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) { ) } var ( - err error - start time.Time = time.Now() + err error + start time.Time = time.Now() + rollbackSteps stepFuncs ) ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute) @@ -224,13 +312,21 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) { ) defer func() { - if err != nil { + err2 := err + if err2 != nil { + err = rollbackSteps.Apply("DB-SwitchToWrite-Rollback", true, 1) + } + if err2 != nil { logger = logutil.Error } + if err != nil { + logger = logutil.Fatal + } logger( "DB-SwitchToWrite-Done", zap.String("cmd", cmd.String()), zap.Duration("duration", time.Since(start)), + zap.Any("rollback-error", err2), zap.Error(err), ) cmd.setError(err) @@ -252,7 +348,22 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) { // TODO // 5. start merge scheduler|checkpoint|diskcleaner - // 5.1 switch the diskcleaner to write mode + // 5.1 TODO: start the merger|checkpoint|flusher + c.db.BGFlusher.Restart() // TODO: Restart with new config + rollbackSteps.Push( + struct { + fn func() error + desc string + }{ + fn: func() error { + c.db.BGFlusher.Stop() + return nil + }, + desc: "stop bg flusher", + }, + ) + + // 5.2 switch the diskcleaner to write mode if err = c.db.DiskCleaner.SwitchToWriteMode(ctx); err != nil { // Rollback return diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 82938a594bb67..6492ab11b45be 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -105,6 +105,7 @@ type DB struct { BGScanner wb.IHeartbeater BGCheckpointRunner checkpoint.Runner + BGFlusher checkpoint.Flusher MergeScheduler *merge.Scheduler @@ -141,15 +142,29 @@ func (db *DB) GetUsageMemo() *logtail.TNUsageMemo { return db.usageMemo } +func (db *DB) CollectCheckpointsInRange( + ctx context.Context, start, end types.TS, +) (ckpLoc string, lastEnd types.TS, err error) { + return db.BGCheckpointRunner.CollectCheckpointsInRange(ctx, start, end) +} + func (db *DB) FlushTable( ctx context.Context, tenantID uint32, dbId, tableId uint64, ts types.TS) (err error) { - err = db.BGCheckpointRunner.FlushTable(ctx, dbId, tableId, ts) + err = db.BGFlusher.FlushTable(ctx, dbId, tableId, ts) return } +func (db *DB) ForceFlush( + ts types.TS, ctx context.Context, forceDuration time.Duration, +) (err error) { + return db.BGFlusher.ForceFlush( + ts, ctx, forceDuration, + ) +} + func (db *DB) ForceCheckpoint( ctx context.Context, ts types.TS, @@ -163,7 +178,7 @@ func (db *DB) ForceCheckpoint( flushDuration = time.Minute * 3 / 2 } t0 := time.Now() - err = db.BGCheckpointRunner.ForceFlush(ts, ctx, flushDuration) + err = db.BGFlusher.ForceFlush(ts, ctx, flushDuration) forceFlushCost := time.Since(t0) defer func() { @@ -219,7 +234,7 @@ func (db *DB) ForceGlobalCheckpoint( defer db.BGCheckpointRunner.EnableCheckpoint() db.BGCheckpointRunner.CleanPenddingCheckpoint() t0 := time.Now() - err = db.BGCheckpointRunner.ForceFlush(ts, ctx, flushDuration) + err = db.BGFlusher.ForceFlush(ts, ctx, flushDuration) forceFlushCost := time.Since(t0) defer func() { logger := logutil.Info @@ -256,7 +271,7 @@ func (db *DB) ForceCheckpointForBackup( defer db.BGCheckpointRunner.EnableCheckpoint() db.BGCheckpointRunner.CleanPenddingCheckpoint() t0 := time.Now() - err = db.BGCheckpointRunner.ForceFlush(ts, ctx, flushDuration) + err = db.BGFlusher.ForceFlush(ts, ctx, flushDuration) forceFlushCost := time.Since(t0) defer func() { @@ -355,6 +370,7 @@ func (db *DB) Close() error { db.Controller.Stop() db.CronJobs.Reset() db.BGScanner.Stop() + db.BGFlusher.Stop() db.BGCheckpointRunner.Stop() db.Runtime.Scheduler.Stop() db.TxnMgr.Stop() diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index 7177c9ecff697..bf1e6802486f0 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -202,15 +202,22 @@ func Open( db.Catalog, logtail.NewDirtyCollector(db.LogtailMgr, db.Opts.Clock, db.Catalog, new(catalog.LoopProcessor)), db.Wal, - checkpoint.WithFlushInterval(opts.CheckpointCfg.FlushInterval), - checkpoint.WithCollectInterval(opts.CheckpointCfg.ScanInterval), checkpoint.WithMinCount(int(opts.CheckpointCfg.MinCount)), checkpoint.WithCheckpointBlockRows(opts.CheckpointCfg.BlockRows), checkpoint.WithCheckpointSize(opts.CheckpointCfg.Size), checkpoint.WithMinIncrementalInterval(opts.CheckpointCfg.IncrementalInterval), checkpoint.WithGlobalMinCount(int(opts.CheckpointCfg.GlobalMinCount)), checkpoint.WithGlobalVersionInterval(opts.CheckpointCfg.GlobalVersionInterval), - checkpoint.WithReserveWALEntryCount(opts.CheckpointCfg.ReservedWALEntryCount)) + checkpoint.WithReserveWALEntryCount(opts.CheckpointCfg.ReservedWALEntryCount), + ) + db.BGFlusher = checkpoint.NewFlusher( + db.Runtime, + db.BGCheckpointRunner, + db.Catalog, + db.BGCheckpointRunner.GetDirtyCollector(), + checkpoint.WithFlusherInterval(opts.CheckpointCfg.FlushInterval), + checkpoint.WithFlusherCronPeriod(opts.CheckpointCfg.ScanInterval), + ) now := time.Now() ckpReplayer := db.BGCheckpointRunner.Replay(dataFactory) @@ -267,6 +274,7 @@ func Open( scanner.RegisterOp(db.MergeScheduler) db.Wal.Start() db.BGCheckpointRunner.Start() + db.BGFlusher.Start() db.BGScanner = w.NewHeartBeater( opts.CheckpointCfg.ScanInterval, diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 9e5734ec2fe41..efa62d0ea3f93 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -2623,7 +2623,7 @@ func (c *dummyCpkGetter) CollectCheckpointsInRange(ctx context.Context, start, e return "", types.TS{}, nil } -func (c *dummyCpkGetter) FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error { +func (c *dummyCpkGetter) FlushTable(ctx context.Context, accoutID uint32, dbID, tableID uint64, ts types.TS) error { return nil } @@ -4531,8 +4531,7 @@ func TestFlushTable(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - tae.BGCheckpointRunner.DebugUpdateOptions( - checkpoint.WithForceFlushCheckInterval(time.Millisecond * 5)) + tae.BGFlusher.ChangeForceCheckInterval(time.Millisecond * 5) schema := catalog.MockSchemaAll(3, 1) schema.Extra.BlockMaxRows = 10 @@ -8022,7 +8021,7 @@ func TestForceCheckpoint(t *testing.T) { tae.CreateRelAndAppend(bat, true) - err = tae.BGCheckpointRunner.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.Error(t, err) err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) assert.NoError(t, err) diff --git a/pkg/vm/engine/tae/db/test/replay_test.go b/pkg/vm/engine/tae/db/test/replay_test.go index 037519244f55e..1bae1ed51faaa 100644 --- a/pkg/vm/engine/tae/db/test/replay_test.go +++ b/pkg/vm/engine/tae/db/test/replay_test.go @@ -459,7 +459,7 @@ func TestReplay2(t *testing.T) { assert.NotNil(t, err) assert.Nil(t, txn.Commit(context.Background())) - err = tae2.BGCheckpointRunner.ForceFlush(tae2.TxnMgr.Now(), context.Background(), time.Second*10) + err = tae2.ForceFlush(tae2.TxnMgr.Now(), context.Background(), time.Second*10) assert.NoError(t, err) err = tae2.BGCheckpointRunner.ForceIncrementalCheckpoint(tae2.TxnMgr.Now(), false) assert.NoError(t, err) @@ -814,7 +814,7 @@ func TestReplay5(t *testing.T) { assert.NoError(t, txn.Commit(context.Background())) testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) - err = tae.BGCheckpointRunner.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) assert.NoError(t, err) @@ -841,7 +841,7 @@ func TestReplay5(t *testing.T) { } assert.NoError(t, txn.Commit(context.Background())) testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) - err = tae.BGCheckpointRunner.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) assert.NoError(t, err) @@ -878,7 +878,7 @@ func TestReplay5(t *testing.T) { assert.True(t, moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry)) assert.NoError(t, txn.Commit(context.Background())) - err = tae.BGCheckpointRunner.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) assert.NoError(t, err) @@ -939,7 +939,7 @@ func TestReplay6(t *testing.T) { assert.NoError(t, txn.Commit(context.Background())) testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) testutil.MergeBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) - err = tae.BGCheckpointRunner.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) assert.NoError(t, err) @@ -1098,7 +1098,7 @@ func TestReplay8(t *testing.T) { assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) // t.Log(tae.Catalog.SimplePPString(common.PPL1)) - err = tae.BGCheckpointRunner.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) tae.Restart(ctx) diff --git a/pkg/vm/engine/tae/db/testutil/engine.go b/pkg/vm/engine/tae/db/testutil/engine.go index f04e84eaafceb..c422d9538fd9a 100644 --- a/pkg/vm/engine/tae/db/testutil/engine.go +++ b/pkg/vm/engine/tae/db/testutil/engine.go @@ -162,21 +162,21 @@ func (e *TestEngine) CheckRowsByScan(exp int, applyDelete bool) { assert.NoError(e.T, txn.Commit(context.Background())) } func (e *TestEngine) ForceCheckpoint() { - err := e.BGCheckpointRunner.ForceFlushWithInterval(e.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) + err := e.BGFlusher.ForceFlushWithInterval(e.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(e.T, err) err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now(), false) assert.NoError(e.T, err) } func (e *TestEngine) ForceLongCheckpoint() { - err := e.BGCheckpointRunner.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second) + err := e.BGFlusher.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second) assert.NoError(e.T, err) err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now(), false) assert.NoError(e.T, err) } func (e *TestEngine) ForceLongCheckpointTruncate() { - err := e.BGCheckpointRunner.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second) + err := e.BGFlusher.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second) assert.NoError(e.T, err) err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now(), true) assert.NoError(e.T, err) @@ -303,10 +303,10 @@ func (e *TestEngine) IncrementalCheckpoint( } if waitFlush { testutils.WaitExpect(4000, func() bool { - flushed := e.DB.BGCheckpointRunner.IsAllChangesFlushed(types.TS{}, end, false) + flushed := e.DB.BGFlusher.IsAllChangesFlushed(types.TS{}, end, false) return flushed }) - flushed := e.DB.BGCheckpointRunner.IsAllChangesFlushed(types.TS{}, end, true) + flushed := e.DB.BGFlusher.IsAllChangesFlushed(types.TS{}, end, true) require.True(e.T, flushed) } err := e.DB.BGCheckpointRunner.ForceIncrementalCheckpoint(end, false) diff --git a/pkg/vm/engine/tae/logtail/handle.go b/pkg/vm/engine/tae/logtail/handle.go index 219fae96df5b6..429e164eb1f3c 100644 --- a/pkg/vm/engine/tae/logtail/handle.go +++ b/pkg/vm/engine/tae/logtail/handle.go @@ -98,7 +98,7 @@ const Size90M = 90 * 1024 * 1024 type CheckpointClient interface { CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error) - FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error + FlushTable(ctx context.Context, accoutID uint32, dbID, tableID uint64, ts types.TS) error } func HandleSyncLogTailReq( @@ -165,7 +165,7 @@ func HandleSyncLogTailReq( if canRetry { // check simple conditions first _, name, forceFlush := fault.TriggerFault("logtail_max_size") if (forceFlush && name == tableEntry.GetLastestSchemaLocked(false).Name) || resp.ProtoSize() > Size90M { - flushErr := ckpClient.FlushTable(ctx, did, tid, end) + flushErr := ckpClient.FlushTable(ctx, 0, did, tid, end) // try again after flushing newResp, closeCB, err := HandleSyncLogTailReq(ctx, ckpClient, mgr, c, req, false) logutil.Info( diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index 64a850c6349f4..06469ce34a946 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -524,7 +524,7 @@ func (h *Handle) HandleGetLogTail( resp *api.SyncLogTailResp) (closeCB func(), err error) { res, closeCB, err := logtail.HandleSyncLogTailReq( ctx, - h.db.BGCheckpointRunner, + h.db, h.db.LogtailMgr, h.db.Catalog, *req, diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 975beb89cd16f..db004aaea5baa 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -232,7 +232,7 @@ func (c *dummyCpkGetter) CollectCheckpointsInRange(ctx context.Context, start, e return "", types.TS{}, nil } -func (c *dummyCpkGetter) FlushTable(ctx context.Context, dbID, tableID uint64, ts types.TS) error { +func (c *dummyCpkGetter) FlushTable(ctx context.Context, _ uint32, dbID, tableID uint64, ts types.TS) error { return nil } diff --git a/pkg/vm/engine/test/testutil/logtailserver.go b/pkg/vm/engine/test/testutil/logtailserver.go index c01cb4c14d83b..eef8d111ca7bb 100644 --- a/pkg/vm/engine/test/testutil/logtailserver.go +++ b/pkg/vm/engine/test/testutil/logtailserver.go @@ -69,7 +69,7 @@ func NewMockLogtailServer( ls := &TestLogtailServer{} - logtailer := taelogtail.NewLogtailer(ctx, tae.BGCheckpointRunner, tae.LogtailMgr, tae.Catalog) + logtailer := taelogtail.NewLogtailer(ctx, tae, tae.LogtailMgr, tae.Catalog) server, err := service.NewLogtailServer("", cfg, logtailer, rt, rpcServerFactory) if err != nil { return nil, err