diff --git a/pkg/vm/engine/engine_util/sinker.go b/pkg/vm/engine/engine_util/sinker.go index 25c34002768de..f29474b19345c 100644 --- a/pkg/vm/engine/engine_util/sinker.go +++ b/pkg/vm/engine/engine_util/sinker.go @@ -590,6 +590,11 @@ func (sinker *Sinker) Write( } func (sinker *Sinker) Sync(ctx context.Context) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } if len(sinker.staged.persisted) == 0 && len(sinker.staged.inMemory) == 0 { return nil } diff --git a/pkg/vm/engine/engine_util/sinker_test.go b/pkg/vm/engine/engine_util/sinker_test.go index 63d85834b4fcf..fb20f1a1bfdd8 100644 --- a/pkg/vm/engine/engine_util/sinker_test.go +++ b/pkg/vm/engine/engine_util/sinker_test.go @@ -19,6 +19,7 @@ import ( "fmt" "testing" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" @@ -93,34 +94,46 @@ func TestNewSinker(t *testing.T) { require.Equal(t, 0, int(proc.Mp().CurrNB())) } -func TestNewSinker2(t *testing.T) { - proc := testutil.NewProc() - fs, err := fileservice.Get[fileservice.FileService]( - proc.GetFileService(), defines.SharedFileServiceName) - require.NoError(t, err) - - schema := catalog.MockSchema(3, 2) - seqnums := make([]uint16, len(schema.Attrs())) - for i := range schema.Attrs() { - seqnums[i] = schema.GetSeqnum(schema.Attrs()[i]) +func makeTestSinker( + inSchema *catalog.Schema, + mp *mpool.MPool, + fs fileservice.FileService, +) (outSchema *catalog.Schema, sinker *Sinker) { + outSchema = inSchema + if outSchema == nil { + outSchema = catalog.MockSchema(3, 2) + } + seqnums := make([]uint16, len(outSchema.Attrs())) + for i := range outSchema.Attrs() { + seqnums[i] = outSchema.GetSeqnum(outSchema.Attrs()[i]) } factory := NewFSinkerImplFactory( seqnums, - schema.GetPrimaryKey().Idx, + outSchema.GetPrimaryKey().Idx, true, false, - schema.Version, + outSchema.Version, ) - sinker := NewSinker( - schema.GetPrimaryKey().Idx, - schema.Attrs(), - schema.Types(), + sinker = NewSinker( + outSchema.GetPrimaryKey().Idx, + outSchema.Attrs(), + outSchema.Types(), factory, - proc.Mp(), + mp, fs, ) + return +} + +func TestNewSinker2(t *testing.T) { + proc := testutil.NewProc() + fs, err := fileservice.Get[fileservice.FileService]( + proc.GetFileService(), defines.SharedFileServiceName) + require.NoError(t, err) + + schema, sinker := makeTestSinker(nil, proc.Mp(), fs) for i := 0; i < 5; i++ { bat := catalog.MockBatch(schema, 8192*2) @@ -156,3 +169,12 @@ func TestNewSinker2(t *testing.T) { require.Equal(t, 0, int(proc.Mp().CurrNB())) } + +func TestSinkerCancel(t *testing.T) { + var sinker Sinker + ctx, cancel := context.WithCancelCause(context.Background()) + expectErr := moerr.NewInternalErrorNoCtx("tt") + cancel(expectErr) + err := sinker.Sync(ctx) + require.ErrorContains(t, err, expectErr.Error()) +} diff --git a/pkg/vm/engine/tae/db/controller.go b/pkg/vm/engine/tae/db/controller.go index 533d34e51fcf8..f8bedb80ecc42 100644 --- a/pkg/vm/engine/tae/db/controller.go +++ b/pkg/vm/engine/tae/db/controller.go @@ -122,6 +122,9 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { start time.Time = time.Now() ) + ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute) + defer cancel() + logger := logutil.Info logger( "DB-SwitchToReplay-Start", @@ -145,7 +148,15 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { // TODO // 2. switch the checkpoint|diskcleaner to replay mode - // TODO + + // 2.1 remove GC disk cron job. no new GC job will be issued from now on + RemoveCronJob(c.db, CronJobs_Name_GCDisk) + RemoveCronJob(c.db, CronJobs_Name_GCCheckpoint) + if err = c.db.DiskCleaner.SwitchToReplayMode(ctx); err != nil { + // Rollback + return + } + // 2.x TODO: checkpoint runner // 3. build forward write request tunnel to the new write candidate // TODO @@ -175,6 +186,10 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { // 10. forward the write requests to the new write candidate // TODO + if err = CheckCronJobs(c.db, DBTxnMode_Replay); err != nil { + // rollback + return + } // 11. replay the log entries from the logservice // 11.1 switch the txn mode to replay mode c.db.TxnMgr.ToReplayMode() @@ -199,6 +214,9 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) { start time.Time = time.Now() ) + ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute) + defer cancel() + logger := logutil.Info logger( "DB-SwitchToWrite-Start", @@ -234,7 +252,28 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) { // TODO // 5. start merge scheduler|checkpoint|diskcleaner - // TODO + // 5.1 switch the diskcleaner to write mode + if err = c.db.DiskCleaner.SwitchToWriteMode(ctx); err != nil { + // Rollback + return + } + if err = AddCronJob( + c.db, CronJobs_Name_GCDisk, true, + ); err != nil { + // Rollback + return + } + if err = AddCronJob( + c.db, CronJobs_Name_GCCheckpoint, true, + ); err != nil { + // Rollback + return + } + if err = CheckCronJobs(c.db, DBTxnMode_Write); err != nil { + // Rollback + return + } + // 5.x TODO WithTxnMode(DBTxnMode_Write)(c.db) } diff --git a/pkg/vm/engine/tae/db/cronjobs.go b/pkg/vm/engine/tae/db/cronjobs.go new file mode 100644 index 0000000000000..2a37d53f9a84d --- /dev/null +++ b/pkg/vm/engine/tae/db/cronjobs.go @@ -0,0 +1,265 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "context" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" + "go.uber.org/zap" +) + +const ( + CronJobs_Name_GCTransferTable = "GC-Transfer-Table" + CronJobs_Name_GCDisk = "GC-Disk" + CronJobs_Name_GCCheckpoint = "GC-Checkpoint" + CronJobs_Name_GCCatalogCache = "GC-Catalog-Cache" + CronJobs_Name_GCLogtail = "GC-Logtail" + CronJobs_Name_GCLockMerge = "GC-Lock-Merge" + + CronJobs_Name_ReportStats = "Report-Stats" + + CronJobs_Name_Checker = "Checker" +) + +var CronJobs_Open_WriteMode = []string{ + CronJobs_Name_GCTransferTable, + CronJobs_Name_GCDisk, + CronJobs_Name_GCCheckpoint, + CronJobs_Name_GCCatalogCache, + CronJobs_Name_GCLogtail, + CronJobs_Name_GCLockMerge, + CronJobs_Name_ReportStats, +} + +var CronJobs_Open_ReplayMode = []string{ + CronJobs_Name_GCTransferTable, + CronJobs_Name_GCCatalogCache, + CronJobs_Name_GCLogtail, + CronJobs_Name_ReportStats, +} + +// key(string): cron job name +// value(bool,bool,bool,bool): +// 1.bool,2.bool: can be in the write mode, must be in the write mode +// 3.bool,4.bool: can be in the replay mode, must be in the replay mode +var CronJobs_Spec = map[string][]bool{ + CronJobs_Name_GCTransferTable: {true, true, true, true}, + CronJobs_Name_GCDisk: {true, true, false, false}, + CronJobs_Name_GCCheckpoint: {true, true, false, false}, + CronJobs_Name_GCCatalogCache: {true, true, true, true}, + CronJobs_Name_GCLogtail: {true, true, true, true}, + CronJobs_Name_GCLockMerge: {true, true, true, false}, + CronJobs_Name_ReportStats: {true, true, true, true}, + CronJobs_Name_Checker: {true, false, true, false}, +} + +func CanAddCronJob(name string, isWriteModeDB, skipMode bool) bool { + if v, ok := CronJobs_Spec[name]; ok { + if skipMode { + return true + } + if isWriteModeDB { + return v[0] + } + return v[2] + } + return false +} + +func AddCronJobs(db *DB) (err error) { + isWriteMode := db.IsWriteMode() + if isWriteMode { + for _, name := range CronJobs_Open_WriteMode { + if err = AddCronJob(db, name, false); err != nil { + return + } + } + } else { + for _, name := range CronJobs_Open_ReplayMode { + if err = AddCronJob(db, name, false); err != nil { + return + } + } + } + if db.Opts.CheckpointCfg.MetadataCheckInterval > 0 { + if err = AddCronJob(db, CronJobs_Name_Checker, false); err != nil { + return + } + } + err = CheckCronJobs(db, db.GetTxnMode()) + return +} + +func AddCronJob(db *DB, name string, skipMode bool) (err error) { + if !CanAddCronJob(name, db.IsWriteMode(), skipMode) { + return moerr.NewInternalErrorNoCtxf( + "cannot add cron job %s in %s mode", name, db.GetTxnMode(), + ) + } + + switch name { + case CronJobs_Name_GCTransferTable: + err = db.CronJobs.AddJob( + CronJobs_Name_GCTransferTable, + db.Opts.CheckpointCfg.TransferInterval, + func(context.Context) { + db.Runtime.PoolUsageReport() + db.Runtime.TransferDelsMap.Prune(db.Opts.TransferTableTTL) + db.Runtime.TransferTable.RunTTL() + }, + 1, + ) + return + case CronJobs_Name_GCDisk: + err = db.CronJobs.AddJob( + CronJobs_Name_GCDisk, + db.Opts.GCCfg.ScanGCInterval, + func(ctx context.Context) { + db.DiskCleaner.GC(ctx) + }, + 1, + ) + return + case CronJobs_Name_GCCheckpoint: + err = db.CronJobs.AddJob( + CronJobs_Name_GCCheckpoint, + db.Opts.CheckpointCfg.GCCheckpointInterval, + func(ctx context.Context) { + if db.Opts.CheckpointCfg.DisableGCCheckpoint { + return + } + gcWaterMark := db.DiskCleaner.GetCleaner().GetCheckpointGCWaterMark() + if gcWaterMark == nil { + return + } + if err := db.BGCheckpointRunner.GCByTS(ctx, *gcWaterMark); err != nil { + logutil.Error( + "GC-Checkpoint-Err", + zap.Error(err), + ) + } + }, + 1, + ) + return + case CronJobs_Name_GCCatalogCache: + err = db.CronJobs.AddJob( + CronJobs_Name_GCCatalogCache, + db.Opts.CatalogCfg.GCInterval, + func(ctx context.Context) { + if db.Opts.CatalogCfg.DisableGC { + return + } + gcWaterMark := db.DiskCleaner.GetCleaner().GetScanWaterMark() + if gcWaterMark == nil { + return + } + db.Catalog.GCByTS(ctx, gcWaterMark.GetEnd()) + }, + 1, + ) + return + case CronJobs_Name_GCLogtail: + err = db.CronJobs.AddJob( + CronJobs_Name_GCLogtail, + db.Opts.CheckpointCfg.GCCheckpointInterval, + func(ctx context.Context) { + logutil.Info(db.Runtime.ExportLogtailStats()) + ckp := db.BGCheckpointRunner.MaxIncrementalCheckpoint() + if ckp != nil { + ts := types.BuildTS(ckp.GetStart().Physical(), 0) // GetStart is previous + 1, reset it here + db.LogtailMgr.GCByTS(ctx, ts) + } + }, + 1, + ) + return + case CronJobs_Name_GCLockMerge: + err = db.CronJobs.AddJob( + CronJobs_Name_GCLockMerge, + options.DefaultLockMergePruneInterval, + func(ctx context.Context) { + db.Runtime.LockMergeService.Prune() + }, + 1, + ) + return + case CronJobs_Name_ReportStats: + err = db.CronJobs.AddJob( + CronJobs_Name_ReportStats, + time.Second*10, + func(ctx context.Context) { + mpoolAllocatorSubTask() + }, + 1, + ) + return + case CronJobs_Name_Checker: + err = db.CronJobs.AddJob( + CronJobs_Name_Checker, + db.Opts.CheckpointCfg.MetadataCheckInterval, + func(ctx context.Context) { + db.Catalog.CheckMetadata() + }, + 1, + ) + return + } + err = moerr.NewInternalErrorNoCtxf( + "unknown cron job name: %s", name, + ) + return +} + +func RemoveCronJob(db *DB, name string) { + db.CronJobs.RemoveJob(name) +} + +func CheckCronJobs(db *DB, expectMode DBTxnMode) (err error) { + for name, spec := range CronJobs_Spec { + if (expectMode.IsWriteMode() && spec[1]) || (expectMode.IsReplayMode() && spec[3]) { + if job := db.CronJobs.GetJob(name); job == nil { + err = moerr.NewInternalErrorNoCtxf("missing cron job %s in %s mode", name, expectMode) + return + } + } + } + db.CronJobs.ForeachJob(func(name string, _ *tasks.CancelableJob) bool { + if spec, ok := CronJobs_Spec[name]; !ok { + err = moerr.NewInternalErrorNoCtxf("unknown cron job name: %s", name) + return false + } else { + if expectMode.IsWriteMode() { + if !spec[0] { + err = moerr.NewInternalErrorNoCtxf("invalid cron job %s in %s mode", name, expectMode) + return false + } + } else { + if !spec[2] { + err = moerr.NewInternalErrorNoCtxf("invalid cron job %s in %s mode", name, expectMode) + return false + } + } + } + return true + }) + return +} diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 24038a0b1324e..82938a594bb67 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -70,6 +70,14 @@ func (m DBTxnMode) IsValid() bool { return m == DBTxnMode_Write || m == DBTxnMode_Replay } +func (m DBTxnMode) IsWriteMode() bool { + return m == DBTxnMode_Write +} + +func (m DBTxnMode) IsReplayMode() bool { + return m == DBTxnMode_Replay +} + type DBOption func(*DB) func WithTxnMode(mode DBTxnMode) DBOption { diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index 0f1ab1ad4fa81..9d9b132032a6a 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -258,9 +258,28 @@ func (c *checkpointCleaner) TaskNameLocked() string { return c.mutation.taskState.name } -func (c *checkpointCleaner) Replay() (err error) { +func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) { now := time.Now() + ctx, cancel := context.WithCancelCause(inputCtx) + defer cancel(nil) + go func() { + select { + case <-c.ctx.Done(): + cancel(context.Cause(c.ctx)) + case <-inputCtx.Done(): + cancel(context.Cause(inputCtx)) + case <-ctx.Done(): + } + }() + + select { + case <-ctx.Done(): + err = context.Cause(ctx) + return + default: + } + c.StartMutationTask("gc-replay") defer c.StopMutationTask() @@ -344,7 +363,7 @@ func (c *checkpointCleaner) Replay() (err error) { for _, dir := range gcMetaDirs { start := time.Now() window := NewGCWindow(c.mp, c.fs.Service) - err = window.ReadTable(c.ctx, GCMetaDir+dir.Name, c.fs.Service) + err = window.ReadTable(ctx, GCMetaDir+dir.Name, c.fs.Service) if err != nil { logger = logutil.Error } @@ -361,7 +380,7 @@ func (c *checkpointCleaner) Replay() (err error) { } if snapFile != "" { if err = c.mutation.snapshotMeta.ReadMeta( - c.ctx, GCMetaDir+snapFile, c.fs.Service, + ctx, GCMetaDir+snapFile, c.fs.Service, ); err != nil { return } @@ -377,7 +396,7 @@ func (c *checkpointCleaner) Replay() (err error) { c.updateCheckpointGCWaterMark(&end) var ckpData *logtail.CheckpointData - if ckpData, err = c.collectCkpData(compacted); err != nil { + if ckpData, err = c.collectCkpData(ctx, compacted); err != nil { logutil.Error( "GC-REPLAY-COLLECT-ERROR", zap.String("task", c.TaskNameLocked()), @@ -389,7 +408,7 @@ func (c *checkpointCleaner) Replay() (err error) { defer ckpData.Close() var snapshots map[uint32]containers.Vector var pitrs *logtail.PitrInfo - pitrs, err = c.GetPITRsLocked() + pitrs, err = c.GetPITRsLocked(ctx) if err != nil { logutil.Error("GC-REPLAY-GET-PITRS_ERROR", zap.String("task", c.TaskNameLocked()), @@ -708,6 +727,7 @@ func (c *checkpointCleaner) filterCheckpoints( } func (c *checkpointCleaner) mergeCheckpointFilesLocked( + ctx context.Context, checkpointLowWaterMark *types.TS, memoryBuffer *containers.OneSchemaBatchBuffer, accountSnapshots map[uint32][]types.TS, @@ -783,9 +803,9 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( panic(fmt.Sprintf("checkpointMaxEnd %s < window end %s", checkpointMaxEnd.ToString(), window.tsRange.end.ToString())) } - sourcer := window.MakeFilesReader(c.ctx, c.fs.Service) + sourcer := window.MakeFilesReader(ctx, c.fs.Service) bf, err := BuildBloomfilter( - c.ctx, + ctx, Default_Coarse_EstimateRows, Default_Coarse_Probility, 0, @@ -795,7 +815,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( ) if tmpDelFiles, tmpNewFiles, newCheckpoint, newCheckpointData, err = MergeCheckpoint( - c.ctx, + ctx, c.sid, c.fs.Service, toMergeEntries, @@ -850,7 +870,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( } } if c.GCCheckpointEnabled() { - if err = c.fs.DelFiles(c.ctx, deleteFiles); err != nil { + if err = c.fs.DelFiles(ctx, deleteFiles); err != nil { extraErrMsg = "DelFiles failed" return err } @@ -872,10 +892,12 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( } func (c *checkpointCleaner) collectCkpData( + ctx context.Context, ckp *checkpoint.CheckpointEntry, ) (data *logtail.CheckpointData, err error) { return logtail.GetCheckpointData( - c.ctx, c.sid, c.fs.Service, ckp.GetLocation(), ckp.GetVersion()) + ctx, c.sid, c.fs.Service, ckp.GetLocation(), ckp.GetVersion(), + ) } func (c *checkpointCleaner) GetPITRs() (*logtail.PitrInfo, error) { @@ -885,12 +907,12 @@ func (c *checkpointCleaner) GetPITRs() (*logtail.PitrInfo, error) { return c.mutation.snapshotMeta.GetPITR(c.ctx, c.sid, ts, c.fs.Service, c.mp) } -func (c *checkpointCleaner) GetPITRsLocked() (*logtail.PitrInfo, error) { +func (c *checkpointCleaner) GetPITRsLocked(ctx context.Context) (*logtail.PitrInfo, error) { ts := time.Now() - return c.mutation.snapshotMeta.GetPITR(c.ctx, c.sid, ts, c.fs.Service, c.mp) + return c.mutation.snapshotMeta.GetPITR(ctx, c.sid, ts, c.fs.Service, c.mp) } -func (c *checkpointCleaner) TryGC() (err error) { +func (c *checkpointCleaner) TryGC(inputCtx context.Context) (err error) { now := time.Now() c.StartMutationTask("gc-try-gc") defer c.StopMutationTask() @@ -902,9 +924,21 @@ func (c *checkpointCleaner) TryGC() (err error) { zap.Error(err), ) }() + ctx, cancel := context.WithCancelCause(inputCtx) + defer cancel(nil) + go func() { + select { + case <-c.ctx.Done(): + cancel(context.Cause(c.ctx)) + case <-inputCtx.Done(): + cancel(context.Cause(inputCtx)) + case <-ctx.Done(): + } + }() + memoryBuffer := MakeGCWindowBuffer(16 * mpool.MB) defer memoryBuffer.Close(c.mp) - err = c.tryGCLocked(memoryBuffer) + err = c.tryGCLocked(ctx, memoryBuffer) return } @@ -914,6 +948,7 @@ func (c *checkpointCleaner) TryGC() (err error) { // it will update the GC watermark and the checkpoint GC watermark // `mutation.scanned`: it will be GC'ed against the max global checkpoint. func (c *checkpointCleaner) tryGCLocked( + ctx context.Context, memoryBuffer *containers.OneSchemaBatchBuffer, ) (err error) { // 1. Quick check if GC is needed @@ -942,7 +977,7 @@ func (c *checkpointCleaner) tryGCLocked( } if err = c.tryGCAgainstGCKPLocked( - maxGlobalCKP, memoryBuffer, + ctx, maxGlobalCKP, memoryBuffer, ); err != nil { logutil.Error( "GC-TRY-GC-AGAINST-GCKP-ERROR", @@ -980,6 +1015,7 @@ func (c *checkpointCleaner) tryGCLocked( // `gckp` is the global checkpoint that needs to be GC'ed against // `memoryBuffer` is the buffer used to read the data of the GC window func (c *checkpointCleaner) tryGCAgainstGCKPLocked( + ctx context.Context, gckp *checkpoint.CheckpointEntry, memoryBuffer *containers.OneSchemaBatchBuffer, ) (err error) { @@ -997,19 +1033,19 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( zap.String("extra-err-msg", extraErrMsg), ) }() - pitrs, err := c.GetPITRsLocked() + pitrs, err := c.GetPITRsLocked(ctx) if err != nil { extraErrMsg = "GetPITRs failed" return } - snapshots, err = c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs.Service, c.mp) + snapshots, err = c.mutation.snapshotMeta.GetSnapshot(ctx, c.sid, c.fs.Service, c.mp) if err != nil { extraErrMsg = "GetSnapshot failed" return } accountSnapshots := TransformToTSList(snapshots) filesToGC, err := c.doGCAgainstGlobalCheckpointLocked( - gckp, accountSnapshots, pitrs, memoryBuffer, + ctx, gckp, accountSnapshots, pitrs, memoryBuffer, ) if err != nil { extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed" @@ -1018,7 +1054,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( // Delete files after doGCAgainstGlobalCheckpointLocked // TODO:Requires Physical Removal Policy if err = c.deleter.DeleteMany( - c.ctx, + ctx, c.TaskNameLocked(), filesToGC, ); err != nil { @@ -1043,7 +1079,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( if waterMark.GT(&scanMark) { waterMark = scanMark } - err = c.mergeCheckpointFilesLocked(&waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC)) + err = c.mergeCheckpointFilesLocked( + ctx, &waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC), + ) if err != nil { extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString()) } @@ -1053,6 +1091,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( // at least one incremental checkpoint has been scanned // and the GC'ed water mark less than the global checkpoint func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked( + ctx context.Context, gckp *checkpoint.CheckpointEntry, accountSnapshots map[uint32][]types.TS, pitrs *logtail.PitrInfo, @@ -1091,7 +1130,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked( // Also, it will update the GC metadata scannedWindow := c.GetScannedWindowLocked() if filesToGC, metafile, err = scannedWindow.ExecuteGlobalCheckpointBasedGC( - c.ctx, + ctx, gckp, accountSnapshots, pitrs, @@ -1228,7 +1267,7 @@ func (c *checkpointCleaner) DoCheck() error { } defer logtail.CloseSnapshotList(snapshots) - pitr, err := c.GetPITRsLocked() + pitr, err := c.GetPITRsLocked(c.ctx) if err != nil { logutil.Error( "GC-GET-PITR-ERROR", @@ -1339,14 +1378,14 @@ func (c *checkpointCleaner) DoCheck() error { } for _, ckp := range debugCandidates { - data, err := c.collectCkpData(ckp) + data, err := c.collectCkpData(c.ctx, ckp) if err != nil { return err } collectObjectsFromCheckpointData(data, ickpObjects) } cptCkpObjects := make(map[string]*ObjectEntry, 0) - data, err := c.collectCkpData(compacted) + data, err := c.collectCkpData(c.ctx, compacted) if err != nil { return err } @@ -1374,7 +1413,7 @@ func (c *checkpointCleaner) DoCheck() error { return nil } -func (c *checkpointCleaner) Process() { +func (c *checkpointCleaner) Process(inputCtx context.Context) (err error) { if !c.GCEnabled() { return } @@ -1386,7 +1425,6 @@ func (c *checkpointCleaner) Process() { startScanWaterMark := c.GetScanWaterMark() startGCWaterMark := c.GetGCWaterMark() - var err error defer func() { endScanWaterMark := c.GetScanWaterMark() endGCWaterMark := c.GetGCWaterMark() @@ -1402,15 +1440,32 @@ func (c *checkpointCleaner) Process() { ) }() + ctx, cancel := context.WithCancelCause(inputCtx) + defer cancel(nil) + go func() { + select { + case <-ctx.Done(): + case <-c.ctx.Done(): + cancel(context.Cause(c.ctx)) + case <-inputCtx.Done(): + cancel(context.Cause(inputCtx)) + } + }() + + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + memoryBuffer := MakeGCWindowBuffer(16 * mpool.MB) defer memoryBuffer.Close(c.mp) - if err = c.tryScanLocked(memoryBuffer); err != nil { - return - } - if err := c.tryGCLocked(memoryBuffer); err != nil { + if err = c.tryScanLocked(ctx, memoryBuffer); err != nil { return } + err = c.tryGCLocked(ctx, memoryBuffer) + return } // tryScanLocked scans the incremental checkpoints and tries to create a new GC window @@ -1418,6 +1473,7 @@ func (c *checkpointCleaner) Process() { // it will update the scan watermark // it will save the snapshot meta and table info to the disk func (c *checkpointCleaner) tryScanLocked( + ctx context.Context, memoryBuffer *containers.OneSchemaBatchBuffer, ) (err error) { // get the max scanned timestamp @@ -1450,7 +1506,7 @@ func (c *checkpointCleaner) tryScanLocked( var newWindow *GCWindow var tmpNewFiles []string if newWindow, tmpNewFiles, err = c.scanCheckpointsLocked( - candidates, memoryBuffer, + ctx, candidates, memoryBuffer, ); err != nil { logutil.Error( "GC-SCAN-WINDOW-ERROR", @@ -1544,6 +1600,7 @@ func (c *checkpointCleaner) appendFilesToWAL(files ...string) error { // `c.mutation.snapshotMeta` // this function will save the snapshot meta and table info to the disk func (c *checkpointCleaner) scanCheckpointsLocked( + ctx context.Context, ckps []*checkpoint.CheckpointEntry, memoryBuffer *containers.OneSchemaBatchBuffer, ) (gcWindow *GCWindow, newFiles []string, err error) { @@ -1566,6 +1623,12 @@ func (c *checkpointCleaner) scanCheckpointsLocked( var snapshotFile, accountFile GCMetaFile newFiles = make([]string, 0, 3) saveSnapshot := func() (err2 error) { + select { + case <-ctx.Done(): + err2 = context.Cause(ctx) + return + default: + } name := blockio.EncodeSnapshotMetadataFileName( PrefixSnapMeta, ckps[0].GetStart(), @@ -1615,7 +1678,7 @@ func (c *checkpointCleaner) scanCheckpointsLocked( gcWindow = NewGCWindow(c.mp, c.fs.Service) var gcMetaFile string if gcMetaFile, err = gcWindow.ScanCheckpoints( - c.ctx, + ctx, ckps, c.collectCkpData, c.mutUpdateSnapshotMetaLocked, @@ -1629,12 +1692,14 @@ func (c *checkpointCleaner) scanCheckpointsLocked( newFiles = append(newFiles, gcWindow.metaDir+gcMetaFile) c.mutAddMetaFileLocked(snapshotFile.name, snapshotFile) c.mutAddMetaFileLocked(accountFile.name, accountFile) - c.mutAddMetaFileLocked(gcMetaFile, GCMetaFile{ - name: gcMetaFile, - start: gcWindow.tsRange.start, - end: gcWindow.tsRange.end, - ext: blockio.CheckpointExt, - }) + c.mutAddMetaFileLocked( + gcMetaFile, + GCMetaFile{ + name: gcMetaFile, + start: gcWindow.tsRange.start, + end: gcWindow.tsRange.end, + ext: blockio.CheckpointExt, + }) return } diff --git a/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go b/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go new file mode 100644 index 0000000000000..b925ff4812c0d --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/cleaner_test.go @@ -0,0 +1,196 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +func TestDiskCleaner_ReplayToWrite(t *testing.T) { + var ( + replayCnt int + executeCnt int + tryGC int + ) + + cleaner := NewMockCleaner( + WithTryGCFunc(func(context.Context) (err error) { + tryGC++ + return nil + }), + WithReplayFunc(func(context.Context) (err error) { + replayCnt++ + return nil + }), + WithProcessFunc(func(context.Context) (err error) { + time.Sleep(time.Millisecond * 2) + executeCnt++ + return nil + }), + ) + + diskCleaner := NewDiskCleaner(&cleaner, false) + require.True(t, diskCleaner.IsReplayMode()) + require.Equal(t, tryGC, 0) + require.Equal(t, replayCnt, 0) + require.Equal(t, executeCnt, 0) + + diskCleaner.Start() + err := diskCleaner.FlushQueue(context.Background()) + require.NoError(t, err) + require.Equal(t, tryGC, 0) + require.Equal(t, replayCnt, 1) + require.Equal(t, executeCnt, 0) + + err = diskCleaner.GC(context.Background()) + require.Error(t, err) + + err = diskCleaner.SwitchToWriteMode(context.Background()) + require.NoError(t, err) + require.True(t, diskCleaner.IsWriteMode()) + + err = diskCleaner.GC(context.Background()) + require.NoError(t, err) + err = diskCleaner.FlushQueue(context.Background()) + + require.NoError(t, err) + require.Equal(t, tryGC, 0) + require.Equal(t, replayCnt, 1) + require.Equal(t, executeCnt, 1) +} + +// write to replay mode +func TestDiskCleaner_WriteToReplay(t *testing.T) { + var ( + replayCnt int + executeCnt int + tryGC int + ) + cleaner := NewMockCleaner( + WithTryGCFunc(func(context.Context) (err error) { + tryGC++ + return nil + }), + WithReplayFunc(func(context.Context) (err error) { + replayCnt++ + return nil + }), + WithProcessFunc(func(context.Context) (err error) { + time.Sleep(time.Millisecond * 2) + executeCnt++ + return nil + }), + ) + + diskCleaner := NewDiskCleaner(&cleaner, true) + require.True(t, diskCleaner.IsWriteMode()) + require.Equal(t, tryGC, 0) + require.Equal(t, replayCnt, 0) + require.Equal(t, executeCnt, 0) + + diskCleaner.Start() + err := diskCleaner.FlushQueue(context.Background()) + require.NoError(t, err) + require.Equal(t, tryGC, 1) + require.Equal(t, replayCnt, 1) + require.Equal(t, executeCnt, 0) + + err = diskCleaner.GC(context.Background()) + require.NoError(t, err) + err = diskCleaner.FlushQueue(context.Background()) + require.NoError(t, err) + require.Equal(t, tryGC, 1) + require.Equal(t, replayCnt, 1) + require.Equal(t, executeCnt, 1) + + var wg sync.WaitGroup + wg.Add(1) + + WithProcessFunc(func(ctx context.Context) (err error) { + defer func() { + executeCnt++ + }() + if rand.Intn(10) > 5 { + wg.Wait() + } else { + select { + case <-ctx.Done(): + err = context.Cause(ctx) + t.Logf("%d: %v", executeCnt, err) + return + default: + } + time.Sleep(2 * time.Millisecond) + } + return nil + })(&cleaner) + + for i := 0; i < 10; i++ { + err = diskCleaner.GC(context.Background()) + require.NoError(t, err) + } + go func() { + time.Sleep(time.Millisecond * 4) + wg.Done() + }() + + err = diskCleaner.SwitchToReplayMode(context.Background()) + require.NoError(t, err) + require.Equal(t, executeCnt, 11) + require.True(t, diskCleaner.IsReplayMode()) + + WithProcessFunc(func(context.Context) (err error) { + executeCnt++ + return nil + })(&cleaner) + + err = diskCleaner.GC(context.Background()) + require.Error(t, err) + require.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnControl)) +} + +func TestForMockCoverage(t *testing.T) { + var cleaner MockCleaner + ctx := context.Background() + require.NoError(t, cleaner.Replay(ctx)) + require.NoError(t, cleaner.Process(ctx)) + require.NoError(t, cleaner.TryGC(ctx)) + cleaner.AddChecker(nil, "") + require.Nil(t, cleaner.GetChecker("")) + require.Nil(t, cleaner.RemoveChecker("")) + require.Nil(t, cleaner.GetScanWaterMark()) + require.Nil(t, cleaner.GetCheckpointGCWaterMark()) + require.Nil(t, cleaner.GetScannedWindow()) + require.Nil(t, cleaner.GetMinMerged()) + require.Nil(t, cleaner.DoCheck()) + v1, v2 := cleaner.GetPITRs() + require.Nil(t, v1) + require.Nil(t, v2) + cleaner.GCEnabled() + require.Nil(t, cleaner.GetMPool()) + v3, v4 := cleaner.GetSnapshots() + require.Nil(t, v3) + require.Nil(t, v4) + require.Equal(t, "", cleaner.GetTablePK(0)) + + require.NoError(t, cleaner.Close()) +} diff --git a/pkg/vm/engine/tae/db/gc/v3/deleter.go b/pkg/vm/engine/tae/db/gc/v3/deleter.go index 2f104089f7bd4..dad88f0b07aa7 100644 --- a/pkg/vm/engine/tae/db/gc/v3/deleter.go +++ b/pkg/vm/engine/tae/db/gc/v3/deleter.go @@ -93,6 +93,12 @@ func (g *Deleter) DeleteMany( toDeletePaths := g.toDeletePaths for i := 0; i < cnt; i += g.deleteBatchSize { + select { + case <-ctx.Done(): + err = context.Cause(ctx) + return + default: + } end := i + g.deleteBatchSize if end > cnt { end = cnt diff --git a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go index d40b1f9d99970..800f32e887d00 100644 --- a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go +++ b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go @@ -16,23 +16,59 @@ package gc import ( "context" + "fmt" "sync" + "sync/atomic" + "time" + "github.com/google/uuid" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" + "go.uber.org/zap" ) +var CauseSwitchWrite2Replay = moerr.NewInternalErrorNoCtx("SwitchWrite2Replay") + const ( - MessgeReplay = iota - MessgeNormal + JT_GCNoop tasks.JobType = 300 + iota + JT_GCExecute + JT_GCReplay + JT_GCReplayAndExecute ) +func init() { + tasks.RegisterJobType(JT_GCNoop, "GCNoopJob") + tasks.RegisterJobType(JT_GCExecute, "GCExecute") + tasks.RegisterJobType(JT_GCReplay, "GCReplay") + tasks.RegisterJobType(JT_GCReplayAndExecute, "GCReplayAndExecute") +} + +type StateStep = uint32 + +const ( + StateStep_Write StateStep = iota + StateStep_Write2Replay + StateStep_Replay + StateStep_Replay2Write +) + +type runningCtx struct { + ctx context.Context + cancel context.CancelCauseFunc +} + // DiskCleaner is the main structure of v2 operation, // and provides "JobFactory" to let tae notify itself // to perform a v2 type DiskCleaner struct { cleaner Cleaner + step atomic.Uint32 + replayError atomic.Value + runningCtx atomic.Pointer[runningCtx] + processQueue sm.Queue onceStart sync.Once @@ -40,59 +76,293 @@ type DiskCleaner struct { } func NewDiskCleaner( - diskCleaner Cleaner, + diskCleaner Cleaner, isWriteMode bool, ) *DiskCleaner { cleaner := &DiskCleaner{ cleaner: diskCleaner, } - cleaner.processQueue = sm.NewSafeQueue(10000, 1000, cleaner.process) + if isWriteMode { + cleaner.step.Store(StateStep_Write) + } else { + cleaner.step.Store(StateStep_Replay) + } + + cleaner.processQueue = sm.NewSafeQueue(1000, 100, cleaner.process) return cleaner } func (cleaner *DiskCleaner) GC(ctx context.Context) (err error) { - logutil.Info("GC-Send-Intents") - return cleaner.tryClean(ctx) + return cleaner.scheduleGCJob(ctx) +} + +func (cleaner *DiskCleaner) IsWriteMode() bool { + return cleaner.step.Load() == StateStep_Write +} + +func (cleaner *DiskCleaner) IsReplayMode() bool { + return cleaner.step.Load() == StateStep_Replay +} + +func (cleaner *DiskCleaner) SwitchToWriteMode(ctx context.Context) (err error) { + oldStep := cleaner.step.Load() + switch oldStep { + case StateStep_Write: + return + case StateStep_Replay2Write, StateStep_Write2Replay: + err = moerr.NewTxnControlErrorNoCtxf("Bad cleaner state: %d", oldStep) + return + } + + if !cleaner.step.CompareAndSwap(oldStep, StateStep_Replay2Write) { + err = moerr.NewTxnControlErrorNoCtxf("Bad cleaner state: %d", oldStep) + return + } + + now := time.Now() + defer func() { + logger := logutil.Info + // any error occurs, switch back to StateStep_Write + if err != nil { + cleaner.step.Store(StateStep_Replay) + logger = logutil.Error + } + logger( + "GC-Switch2Write", + zap.Duration("duration", time.Since(now)), + zap.Error(err), + ) + }() + + cleaner.step.Store(StateStep_Write) + return +} + +func (cleaner *DiskCleaner) SwitchToReplayMode(ctx context.Context) (err error) { + oldStep := cleaner.step.Load() + switch oldStep { + case StateStep_Replay: + return + case StateStep_Replay2Write, StateStep_Write2Replay: + err = moerr.NewTxnControlErrorNoCtxf("Bad cleaner state: %d", oldStep) + return + } + + if !cleaner.step.CompareAndSwap(oldStep, StateStep_Write2Replay) { + err = moerr.NewTxnControlErrorNoCtxf("Bad cleaner state: %d", oldStep) + return + } + now := time.Now() + defer func() { + logger := logutil.Info + // any error occurs, switch back to StateStep_Write + if err != nil { + cleaner.step.Store(StateStep_Write) + logger = logutil.Error + } + logger( + "GC-Switch2Replay", + zap.Duration("duration", time.Since(now)), + zap.Error(err), + ) + }() + + cleaner.CancelRunning(CauseSwitchWrite2Replay) + + // the current state is StateStep_Write2Replay + if err = cleaner.FlushQueue(ctx); err != nil { + return + } + + cleaner.step.Store(StateStep_Replay) + return +} + +func (cleaner *DiskCleaner) FlushQueue( + ctx context.Context, +) (err error) { + var job *tasks.Job + if job, err = cleaner.addJob( + ctx, + JT_GCNoop, + func(context.Context) *tasks.JobResult { + result := new(tasks.JobResult) + return result + }, + ); err != nil { + return + } + job.WaitDone() + return +} + +func (cleaner *DiskCleaner) addJob( + ctx context.Context, jt tasks.JobType, execFn func(context.Context) *tasks.JobResult, +) (job *tasks.Job, err error) { + job = new(tasks.Job) + job.Init( + ctx, + uuid.Must(uuid.NewV7()).String(), + jt, + execFn, + ) + if _, err = cleaner.processQueue.Enqueue(job); err != nil { + job = nil + } + return } func (cleaner *DiskCleaner) GetCleaner() Cleaner { return cleaner.cleaner } -func (cleaner *DiskCleaner) tryReplay() { - if _, err := cleaner.processQueue.Enqueue(MessgeReplay); err != nil { - panic(err) +// should only be called during the startup +// no check for the current state +func (cleaner *DiskCleaner) forceScheduleJob(jt tasks.JobType) (err error) { + _, err = cleaner.processQueue.Enqueue(jt) + return +} + +// only can be executed in StateStep_Write +// otherwise, return moerr.NewTxnControlErrorNoCtxf("GC-Not-Write-Mode") +func (cleaner *DiskCleaner) scheduleGCJob(ctx context.Context) (err error) { + if step := cleaner.step.Load(); step != StateStep_Write { + err = moerr.NewTxnControlErrorNoCtxf("GC-Not-Write-Mode") + return } + logutil.Info("GC-Send-Intents") + _, err = cleaner.processQueue.Enqueue(JT_GCExecute) + return } -func (cleaner *DiskCleaner) tryClean(ctx context.Context) (err error) { - _, err = cleaner.processQueue.Enqueue(MessgeNormal) +// execute the GC job +// 1. it should be replayed first with no error +// 2. then execute the GC job +func (cleaner *DiskCleaner) doExecute(ctx context.Context) (err error) { + now := time.Now() + msg := "GC-Execute" + defer func() { + logger := logutil.Info + if err != nil { + logger = logutil.Error + } + logger( + msg, + zap.Duration("duration", time.Since(now)), + zap.Error(err), + ) + }() + var ok bool + if replayErr := cleaner.replayError.Load(); replayErr != nil { + if _, ok = replayErr.(error); ok { + if err = cleaner.cleaner.Replay(ctx); err != nil { + msg = "GC-Replay" + cleaner.replayError.Store(err) + return + } else { + cleaner.replayError.Store(0) + } + } + } + err = cleaner.cleaner.Process(ctx) return } -func (cleaner *DiskCleaner) replay() error { - return cleaner.cleaner.Replay() +// it will update the replayError after replay +func (cleaner *DiskCleaner) doReplay(ctx context.Context) (err error) { + if err = cleaner.cleaner.Replay(ctx); err != nil { + logutil.Error("GC-Replay-Error", zap.Error(err)) + cleaner.replayError.Store(err) + } else { + cleaner.replayError.Store(0) + } + return } -func (cleaner *DiskCleaner) process(items ...any) { - if items[0].(int) == MessgeReplay { - err := cleaner.replay() +func (cleaner *DiskCleaner) doReplayAndExecute(ctx context.Context) (err error) { + // defer func() { + // if err := recover(); err != nil { + // logutil.Error("GC-Replay-Panic", zap.Any("err", err)) + // } + // }() + msg := "GC-Replay" + now := time.Now() + defer func() { + logger := logutil.Info if err != nil { - panic(err) - } - cleaner.cleaner.TryGC() - if len(items) == 1 { - return + logger = logutil.Error } + logger( + msg, + zap.Duration("duration", time.Since(now)), + zap.Error(err), + ) + }() + if err = cleaner.doReplay(ctx); err != nil { + return } + msg = "GC-TryGC" + err = cleaner.cleaner.TryGC(ctx) + return +} + +func (cleaner *DiskCleaner) process(items ...any) { + for _, item := range items { + switch v := item.(type) { + case tasks.JobType: + ctx := cleaner.runningCtx.Load() + if ctx == nil { + ctx = new(runningCtx) + ctx.ctx, ctx.cancel = context.WithCancelCause(context.Background()) + cleaner.runningCtx.Store(ctx) + } + switch v { + case JT_GCReplay: + cleaner.doReplay(ctx.ctx) + case JT_GCReplayAndExecute: + cleaner.doReplayAndExecute(ctx.ctx) + case JT_GCExecute: + cleaner.doExecute(ctx.ctx) + default: + logutil.Error("GC-Unknown-JobType", zap.Any("job-type", v)) + } + case *tasks.Job: + // noop will reset the runningCtx + if v.Type() == JT_GCNoop { + runningCtx := new(runningCtx) + runningCtx.ctx, runningCtx.cancel = context.WithCancelCause(context.Background()) + if oldCtx := cleaner.runningCtx.Load(); oldCtx != nil { + oldCtx.cancel(nil) + } - cleaner.cleaner.Process() + cleaner.runningCtx.Store(runningCtx) + } + v.Run() + } + } +} +func (cleaner *DiskCleaner) CancelRunning(cause error) { + if ctx := cleaner.runningCtx.Load(); ctx != nil { + ctx.cancel(cause) + } } func (cleaner *DiskCleaner) Start() { cleaner.onceStart.Do(func() { cleaner.processQueue.Start() - cleaner.tryReplay() + step := cleaner.step.Load() + switch step { + case StateStep_Write: + if err := cleaner.forceScheduleJob(JT_GCReplayAndExecute); err != nil { + panic(err) + } + case StateStep_Replay: + if err := cleaner.forceScheduleJob(JT_GCReplay); err != nil { + panic(err) + } + default: + panic(fmt.Sprintf("Bad cleaner state: %d", step)) + } }) } @@ -100,5 +370,9 @@ func (cleaner *DiskCleaner) Stop() { cleaner.onceStop.Do(func() { cleaner.processQueue.Stop() cleaner.cleaner.Stop() + logutil.Info( + "GC-DiskCleaner-Started", + zap.Uint32("step", cleaner.step.Load()), + ) }) } diff --git a/pkg/vm/engine/tae/db/gc/v3/executor.go b/pkg/vm/engine/tae/db/gc/v3/executor.go index 89f3d5634e93b..73199da1f7865 100644 --- a/pkg/vm/engine/tae/db/gc/v3/executor.go +++ b/pkg/vm/engine/tae/db/gc/v3/executor.go @@ -52,6 +52,11 @@ func BuildBloomfilter( var done bool for { bat.CleanOnlyData() + select { + case <-ctx.Done(): + return nil, context.Cause(ctx) + default: + } if done, err = sourcer(ctx, bat.Attrs, nil, mp, bat); err != nil { return } @@ -110,6 +115,11 @@ func (exec *GCExecutor) doFilter( for { bat.CleanOnlyData() canGCBat.CleanOnlyData() + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } // 1. get next batch from sourcer done, err := sourcer(ctx, bat.Attrs, nil, exec.mp, bat) if err != nil { diff --git a/pkg/vm/engine/tae/db/gc/v3/merge.go b/pkg/vm/engine/tae/db/gc/v3/merge.go index 1ebf03dc0237e..5d0aa85dd20f2 100644 --- a/pkg/vm/engine/tae/db/gc/v3/merge.go +++ b/pkg/vm/engine/tae/db/gc/v3/merge.go @@ -16,12 +16,13 @@ package gc import ( "context" + "strings" + "github.com/matrixorigin/matrixone/pkg/common/bloomfilter" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "go.uber.org/zap" - "strings" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -55,13 +56,25 @@ func MergeCheckpoint( datas := make([]*logtail.CheckpointData, 0) deleteFiles = make([]string, 0) for _, ckpEntry := range ckpEntries { + select { + case <-ctx.Done(): + err = context.Cause(ctx) + return + default: + } logutil.Info("[MergeCheckpoint]", zap.String("checkpoint", ckpEntry.String())) var data *logtail.CheckpointData var locations map[string]objectio.Location - _, data, err = logtail.LoadCheckpointEntriesFromKey(context.Background(), sid, fs, - ckpEntry.GetLocation(), ckpEntry.GetVersion(), nil, &types.TS{}) - if err != nil { + if _, data, err = logtail.LoadCheckpointEntriesFromKey( + ctx, + sid, + fs, + ckpEntry.GetLocation(), + ckpEntry.GetVersion(), + nil, + &types.TS{}, + ); err != nil { return } datas = append(datas, data) @@ -81,7 +94,8 @@ func MergeCheckpoint( // add checkpoint idx file to deleteFiles deleteFiles = append(deleteFiles, ckpEntry.GetLocation().Name().String()) locations, err = logtail.LoadCheckpointLocations( - ctx, sid, ckpEntry.GetTNLocation(), ckpEntry.GetVersion(), fs) + ctx, sid, ckpEntry.GetTNLocation(), ckpEntry.GetVersion(), fs, + ) if err != nil { if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) { deleteFiles = append(deleteFiles, nameMeta) @@ -107,6 +121,12 @@ func MergeCheckpoint( // merge objects referenced by sansphot and pitr for _, data := range datas { + select { + case <-ctx.Done(): + err = context.Cause(ctx) + return + default: + } ins := data.GetObjectBatchs() tombstone := data.GetTombstoneObjectBatchs() bf.Test(ins.GetVectorByName(catalog.ObjectAttr_ObjectStats).GetDownstreamVector(), diff --git a/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go b/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go new file mode 100644 index 0000000000000..0a52223ad2754 --- /dev/null +++ b/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go @@ -0,0 +1,148 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "context" + + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" +) + +type MockCleanerOption func(*MockCleaner) + +func WithReplayFunc(f func(context.Context) error) MockCleanerOption { + return func(c *MockCleaner) { + c.repalyFunc = f + } +} + +func WithProcessFunc(f func(context.Context) error) MockCleanerOption { + return func(c *MockCleaner) { + c.processFunc = f + } +} + +func WithTryGCFunc(f func(context.Context) error) MockCleanerOption { + return func(c *MockCleaner) { + c.tryGC = f + } +} + +type MockCleaner struct { + repalyFunc func(context.Context) error + processFunc func(context.Context) error + tryGC func(context.Context) error +} + +func NewMockCleaner(opts ...MockCleanerOption) MockCleaner { + cleaner := MockCleaner{} + for _, opt := range opts { + opt(&cleaner) + } + return cleaner +} + +func (c *MockCleaner) Replay(ctx context.Context) error { + if c.repalyFunc != nil { + return c.repalyFunc(ctx) + } + return nil +} + +func (c *MockCleaner) Process(ctx context.Context) error { + if c.processFunc != nil { + return c.processFunc(ctx) + } + return nil +} + +func (c *MockCleaner) TryGC(ctx context.Context) error { + if c.tryGC != nil { + return c.tryGC(ctx) + } + return nil +} + +func (c *MockCleaner) AddChecker(checker func(item any) bool, key string) int { + return 0 +} + +func (c *MockCleaner) Close() error { + return nil +} + +func (c *MockCleaner) GetChecker(key string) func(item any) bool { + return nil +} + +func (c *MockCleaner) RemoveChecker(key string) error { + return nil +} + +func (c *MockCleaner) GetScanWaterMark() *checkpoint.CheckpointEntry { + return nil +} + +func (c *MockCleaner) GetCheckpointGCWaterMark() *types.TS { + return nil +} + +func (c *MockCleaner) GetScannedWindow() *GCWindow { + return nil +} + +func (c *MockCleaner) Stop() { +} + +func (c *MockCleaner) GetMinMerged() *checkpoint.CheckpointEntry { + return nil +} + +func (c *MockCleaner) DoCheck() error { + return nil +} + +func (c *MockCleaner) GetPITRs() (*logtail.PitrInfo, error) { + return nil, nil +} + +func (c *MockCleaner) SetTid(tid uint64) { +} + +func (c *MockCleaner) EnableGC() { +} + +func (c *MockCleaner) DisableGC() { +} + +func (c *MockCleaner) GCEnabled() bool { + return false +} + +func (c *MockCleaner) GetMPool() *mpool.MPool { + return nil +} + +func (c *MockCleaner) GetSnapshots() (map[uint32]containers.Vector, error) { + return nil, nil +} + +func (c *MockCleaner) GetTablePK(tableId uint64) string { + return "" +} diff --git a/pkg/vm/engine/tae/db/gc/v3/types.go b/pkg/vm/engine/tae/db/gc/v3/types.go index e4e3d1dde53b6..237d26433b4c8 100644 --- a/pkg/vm/engine/tae/db/gc/v3/types.go +++ b/pkg/vm/engine/tae/db/gc/v3/types.go @@ -15,6 +15,7 @@ package gc import ( + "context" "fmt" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -140,9 +141,9 @@ var ( ) type Cleaner interface { - Replay() error - Process() - TryGC() error + Replay(context.Context) error + Process(context.Context) error + TryGC(context.Context) error AddChecker(checker func(item any) bool, key string) int RemoveChecker(key string) error GetScanWaterMark() *checkpoint.CheckpointEntry diff --git a/pkg/vm/engine/tae/db/gc/v3/window.go b/pkg/vm/engine/tae/db/gc/v3/window.go index b7bc0c0097340..10f59b2d74b25 100644 --- a/pkg/vm/engine/tae/db/gc/v3/window.go +++ b/pkg/vm/engine/tae/db/gc/v3/window.go @@ -174,7 +174,7 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC( func (w *GCWindow) ScanCheckpoints( ctx context.Context, checkpointEntries []*checkpoint.CheckpointEntry, - collectCkpData func(*checkpoint.CheckpointEntry) (*logtail.CheckpointData, error), + collectCkpData func(context.Context, *checkpoint.CheckpointEntry) (*logtail.CheckpointData, error), processCkpData func(*checkpoint.CheckpointEntry, *logtail.CheckpointData) error, onScanDone func() error, buffer *containers.OneSchemaBatchBuffer, @@ -185,10 +185,15 @@ func (w *GCWindow) ScanCheckpoints( start := checkpointEntries[0].GetStart() end := checkpointEntries[len(checkpointEntries)-1].GetEnd() getOneBatch := func(cxt context.Context, bat *batch.Batch, mp *mpool.MPool) (bool, error) { + select { + case <-cxt.Done(): + return false, context.Cause(cxt) + default: + } if len(checkpointEntries) == 0 { return true, nil } - data, err := collectCkpData(checkpointEntries[0]) + data, err := collectCkpData(ctx, checkpointEntries[0]) if err != nil { return false, err } @@ -265,6 +270,11 @@ func (w *GCWindow) writeMetaForRemainings( ctx context.Context, stats []objectio.ObjectStats, ) (string, error) { + select { + case <-ctx.Done(): + return "", context.Cause(ctx) + default: + } name := blockio.EncodeGCMetadataFileName(PrefixGCMeta, w.tsRange.start, w.tsRange.end) ret := batch.NewWithSchema( false, ObjectTableMetaAttrs, ObjectTableMetaTypes, @@ -460,6 +470,12 @@ func (w *GCWindow) replayData( // ReadTable reads an s3 file and replays a GCWindow in memory func (w *GCWindow) ReadTable(ctx context.Context, name string, fs fileservice.FileService) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + var release1 func() var buffer *batch.Batch defer func() { diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index 0b6b91d7faef4..7177c9ecff697 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -277,7 +277,8 @@ func Open( gc2.SetDeleteBatchSize(opts.GCCfg.GCDeleteBatchSize) // sjw TODO: cleaner need to support replay and write mode - cleaner := gc2.NewCheckpointCleaner(opts.Ctx, + cleaner := gc2.NewCheckpointCleaner( + opts.Ctx, opts.SID, fs, db.BGCheckpointRunner, gc2.WithCanGCCacheSize(opts.GCCfg.CacheSize), gc2.WithMaxMergeCheckpointCount(opts.GCCfg.GCMergeCount), @@ -292,104 +293,14 @@ func Open( endTS := checkpoint.GetEnd() return !endTS.GE(&ts) }, cmd_util.CheckerKeyTTL) - db.DiskCleaner = gc2.NewDiskCleaner(cleaner) + + db.DiskCleaner = gc2.NewDiskCleaner(cleaner, db.IsWriteMode()) db.DiskCleaner.Start() db.CronJobs = tasks.NewCancelableJobs() - db.CronJobs.AddJob( - "GC-Transfer-Table", - opts.CheckpointCfg.TransferInterval, - func(ctx context.Context) { - db.Runtime.PoolUsageReport() - db.Runtime.TransferDelsMap.Prune(opts.TransferTableTTL) - transferTable.RunTTL() - }, - 1, - ) - db.CronJobs.AddJob( - "GC-Disk", - opts.GCCfg.ScanGCInterval, - func(ctx context.Context) { - db.DiskCleaner.GC(ctx) - }, - 1, - ) - db.CronJobs.AddJob( - "GC-Checkpoint", - opts.CheckpointCfg.GCCheckpointInterval, - func(ctx context.Context) { - if opts.CheckpointCfg.DisableGCCheckpoint { - return - } - gcWaterMark := db.DiskCleaner.GetCleaner().GetCheckpointGCWaterMark() - if gcWaterMark == nil { - return - } - if err := db.BGCheckpointRunner.GCByTS(ctx, *gcWaterMark); err != nil { - logutil.Error( - "GC-Checkpoint-Err", - zap.Error(err), - ) - } - }, - 1, - ) - db.CronJobs.AddJob( - "GC-Catalog-Cache", - opts.CatalogCfg.GCInterval, - func(ctx context.Context) { - if opts.CatalogCfg.DisableGC { - return - } - gcWaterMark := db.DiskCleaner.GetCleaner().GetScanWaterMark() - if gcWaterMark == nil { - return - } - db.Catalog.GCByTS(ctx, gcWaterMark.GetEnd()) - }, - 1, - ) - db.CronJobs.AddJob( - "GC-Logtail", - opts.CheckpointCfg.GCCheckpointInterval, - func(ctx context.Context) { - logutil.Info(db.Runtime.ExportLogtailStats()) - ckp := db.BGCheckpointRunner.MaxIncrementalCheckpoint() - if ckp != nil { - ts := types.BuildTS(ckp.GetStart().Physical(), 0) // GetStart is previous + 1, reset it here - db.LogtailMgr.GCByTS(ctx, ts) - } - }, - 1, - ) - db.CronJobs.AddJob( - "GC-LockMerge", - options.DefaultLockMergePruneInterval, - func(ctx context.Context) { - db.Runtime.LockMergeService.Prune() - }, - 1, - ) - - db.CronJobs.AddJob( - "REPORT-MPOOL-STATS", - time.Second*10, - func(ctx context.Context) { - mpoolAllocatorSubTask() - }, - 1, - ) - - if opts.CheckpointCfg.MetadataCheckInterval != 0 { - db.CronJobs.AddJob( - "META-CHECK", - opts.CheckpointCfg.MetadataCheckInterval, - func(ctx context.Context) { - db.Catalog.CheckMetadata() - }, - 1, - ) + if err = AddCronJobs(db); err != nil { + return } db.Controller = NewController(db) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index f310431d72a6d..9e5734ec2fe41 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -5660,7 +5660,7 @@ func TestGCWithCheckpoint(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) - manager := gc.NewDiskCleaner(cleaner) + manager := gc.NewDiskCleaner(cleaner, true) manager.Start() defer manager.Stop() @@ -5697,7 +5697,7 @@ func TestGCWithCheckpoint(t *testing.T) { maxEnd := manager.GetCleaner().GetScanWaterMark().GetEnd() assert.True(t, end.Equal(&maxEnd)) cleaner2 := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) - manager2 := gc.NewDiskCleaner(cleaner2) + manager2 := gc.NewDiskCleaner(cleaner2, true) manager2.Start() defer manager2.Stop() testutils.WaitExpect(5000, func() bool { @@ -5730,7 +5730,7 @@ func TestGCDropDB(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) - manager := gc.NewDiskCleaner(cleaner) + manager := gc.NewDiskCleaner(cleaner, true) manager.Start() defer manager.Stop() schema := catalog.MockSchemaAll(3, 1) @@ -5770,7 +5770,7 @@ func TestGCDropDB(t *testing.T) { maxEnd := manager.GetCleaner().GetScanWaterMark().GetEnd() assert.True(t, end.Equal(&maxEnd)) cleaner2 := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) - manager2 := gc.NewDiskCleaner(cleaner2) + manager2 := gc.NewDiskCleaner(cleaner2, true) manager2.Start() defer manager2.Stop() testutils.WaitExpect(5000, func() bool { @@ -5804,7 +5804,7 @@ func TestGCDropTable(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) - manager := gc.NewDiskCleaner(cleaner) + manager := gc.NewDiskCleaner(cleaner, true) manager.Start() defer manager.Stop() schema := catalog.MockSchemaAll(3, 1) @@ -5859,7 +5859,7 @@ func TestGCDropTable(t *testing.T) { maxEnd := manager.GetCleaner().GetScanWaterMark().GetEnd() assert.True(t, end.Equal(&maxEnd)) cleaner2 := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) - manager2 := gc.NewDiskCleaner(cleaner2) + manager2 := gc.NewDiskCleaner(cleaner2, true) manager2.Start() defer manager2.Stop() testutils.WaitExpect(5000, func() bool { @@ -10531,13 +10531,22 @@ func Test_BasicTxnModeSwitch(t *testing.T) { assert.NoError(t, err) assert.True(t, tae.IsWriteMode()) assert.True(t, tae.TxnMgr.IsWriteMode()) + assert.Error(t, db.CheckCronJobs(tae.DB, db.DBTxnMode_Replay)) } func Test_OpenReplayDB1(t *testing.T) { ctx := context.Background() opts := config.WithLongScanAndCKPOpts(nil) + opts.CheckpointCfg.MetadataCheckInterval = time.Millisecond * 2 tae := testutil.NewReplayTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() assert.True(t, tae.IsReplayMode()) assert.True(t, tae.TxnMgr.IsReplayMode()) - defer tae.Close() + for name, spec := range db.CronJobs_Spec { + if !spec[2] { + assert.Error(t, db.AddCronJob(tae.DB, name, false)) + } + } + assert.Error(t, db.AddCronJob(tae.DB, "unknown", false)) + assert.Error(t, db.CheckCronJobs(tae.DB, db.DBTxnMode_Write)) } diff --git a/pkg/vm/engine/tae/db/testutil/engine.go b/pkg/vm/engine/tae/db/testutil/engine.go index 27f48af07d2af..f04e84eaafceb 100644 --- a/pkg/vm/engine/tae/db/testutil/engine.go +++ b/pkg/vm/engine/tae/db/testutil/engine.go @@ -377,17 +377,23 @@ func InitTestDBWithDir( t *testing.T, opts *options.Options, ) *db.DB { - db, _ := db.Open(ctx, dir, opts) + var ( + err error + tae *db.DB + ) + if tae, err = db.Open(ctx, dir, opts); err != nil { + panic(err) + } // only ut executes this checker - db.DiskCleaner.GetCleaner().AddChecker( + tae.DiskCleaner.GetCleaner().AddChecker( func(item any) bool { - min := db.TxnMgr.MinTSForTest() + min := tae.TxnMgr.MinTSForTest() ckp := item.(*checkpoint.CheckpointEntry) //logutil.Infof("min: %v, checkpoint: %v", min.ToString(), checkpoint.GetStart().ToString()) end := ckp.GetEnd() return !end.GE(&min) }, cmd_util.CheckerKeyMinTS) - return db + return tae } func InitTestDB( diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 5fdde61ec4535..c3ea89fed9bcf 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -274,6 +274,12 @@ func GetCheckpointData( location objectio.Location, version uint32, ) (*CheckpointData, error) { + select { + case <-ctx.Done(): + return nil, context.Cause(ctx) + default: + } + data := NewCheckpointData(sid, common.CheckpointAllocator) reader, err := blockio.NewObjectReader(sid, fs, location) if err != nil { diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index 3e9d7d334f4f4..596a6adb6aada 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -759,6 +759,11 @@ func (sm *SnapshotMeta) GetSnapshot( snapshotSchemaTypes[ColObjId], } for tid, objectMap := range objects { + select { + case <-ctx.Done(): + return nil, context.Cause(ctx) + default: + } tombstonesStats := make([]objectio.ObjectStats, 0) for ttid, tombstoneMap := range tombstones { if ttid != tid { @@ -873,6 +878,11 @@ func (sm *SnapshotMeta) GetPITR( tables: make(map[uint64]types.TS), } for _, object := range sm.pitr.objects { + select { + case <-ctx.Done(): + return nil, context.Cause(ctx) + default: + } location := object.stats.ObjectLocation() name := object.stats.ObjectName() for i := uint32(0); i < object.stats.BlkCnt(); i++ { @@ -1265,6 +1275,12 @@ func (sm *SnapshotMeta) Rebuild( } func (sm *SnapshotMeta) ReadMeta(ctx context.Context, name string, fs fileservice.FileService) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + reader, err := blockio.NewFileReaderNoCache(fs, name) if err != nil { return err diff --git a/pkg/vm/engine/tae/logtail/utils.go b/pkg/vm/engine/tae/logtail/utils.go index e8edc42c39d62..37523aa19081d 100644 --- a/pkg/vm/engine/tae/logtail/utils.go +++ b/pkg/vm/engine/tae/logtail/utils.go @@ -1377,6 +1377,11 @@ func LoadCheckpointLocations( version uint32, fs fileservice.FileService, ) (map[string]objectio.Location, error) { + select { + case <-ctx.Done(): + return nil, context.Cause(ctx) + default: + } var err error data := NewCheckpointData(sid, common.CheckpointAllocator) defer data.Close() diff --git a/pkg/vm/engine/tae/tasks/cancelablejob.go b/pkg/vm/engine/tae/tasks/cancelablejob.go index 61ac3f97f4fe1..daa8ae661ff7d 100644 --- a/pkg/vm/engine/tae/tasks/cancelablejob.go +++ b/pkg/vm/engine/tae/tasks/cancelablejob.go @@ -62,6 +62,12 @@ func (jobs *CancelableJobs) AddJob( return nil } +func (jobs *CancelableJobs) GetJob(name string) *CancelableJob { + jobs.RLock() + defer jobs.RUnlock() + return jobs.cronJobs[name] +} + func (jobs *CancelableJobs) RemoveJob(name string) { jobs.Lock() defer jobs.Unlock() @@ -86,6 +92,16 @@ func (jobs *CancelableJobs) Reset() { jobs.cronJobs = make(map[string]*CancelableJob) } +func (jobs *CancelableJobs) ForeachJob(fn func(string, *CancelableJob) bool) { + jobs.RLock() + defer jobs.RUnlock() + for name, job := range jobs.cronJobs { + if !fn(name, job) { + break + } + } +} + type CancelableFunc = func(context.Context) type CancelableJob struct {