From fa228feb8d5712d6aad75d317fb1761c18216339 Mon Sep 17 00:00:00 2001 From: Wei Ziran Date: Wed, 18 Dec 2024 12:23:46 +0800 Subject: [PATCH] align merge with main (#20488) align merge with main Approved by: @aressu1985, @XuPeng-SH, @zhangxu19830126, @sukki37 --- pkg/common/moerr/cause.go | 1 + pkg/taskservice/mysql_task_storage.go | 4 +- pkg/vm/engine/disttae/txn_table.go | 8 +- pkg/vm/engine/tae/catalog/table.go | 47 ++- pkg/vm/engine/tae/common/stats.go | 2 +- pkg/vm/engine/tae/db/db.go | 2 - pkg/vm/engine/tae/db/merge/cnScheduler.go | 154 ++++++++ .../engine/tae/db/merge/cnScheduler_test.go | 117 +++++++ pkg/vm/engine/tae/db/merge/config.go | 3 +- pkg/vm/engine/tae/db/merge/config_test.go | 4 +- pkg/vm/engine/tae/db/merge/executor.go | 299 ++++------------ pkg/vm/engine/tae/db/merge/meminfo_darwin.go | 33 ++ pkg/vm/engine/tae/db/merge/meminfo_linux.go | 26 ++ pkg/vm/engine/tae/db/merge/mod.go | 216 ------------ pkg/vm/engine/tae/db/merge/mod_test.go | 57 --- pkg/vm/engine/tae/db/merge/policyCompact.go | 126 ++++--- .../merge/{policyBasic.go => policyGroup.go} | 171 +-------- pkg/vm/engine/tae/db/merge/policyOverlap.go | 203 +++++------ pkg/vm/engine/tae/db/merge/policyTombstone.go | 12 +- pkg/vm/engine/tae/db/merge/policy_test.go | 292 ++++++++++------ pkg/vm/engine/tae/db/merge/scheduler.go | 57 +-- pkg/vm/engine/tae/db/merge/scheduler_test.go | 11 +- pkg/vm/engine/tae/db/merge/utils.go | 328 +++++++++++++++++- pkg/vm/engine/tae/db/merge/utils_test.go | 141 ++++++++ pkg/vm/engine/tae/db/open.go | 11 +- pkg/vm/engine/tae/db/test/db_test.go | 2 +- pkg/vm/engine/tae/index/zm.go | 2 +- pkg/vm/engine/tae/mergesort/merger.go | 79 ++--- pkg/vm/engine/tae/mergesort/task.go | 123 ++++--- pkg/vm/engine/tae/rpc/handle_debug.go | 2 +- pkg/vm/engine/tae/rpc/handle_test.go | 6 +- pkg/vm/engine/tae/rpc/inspect.go | 15 +- pkg/vm/engine/tae/tables/jobs/mergeobjects.go | 4 +- .../tae/tables/txnentries/mergeobjects.go | 6 +- pkg/vm/engine/tae/tasks/worker/worker.go | 15 +- pkg/vm/engine/tae/tasks/worker/worker_test.go | 75 ++++ .../cases/function/mo_ctl/mo_ctl_merge.result | 2 +- 37 files changed, 1520 insertions(+), 1136 deletions(-) create mode 100644 pkg/vm/engine/tae/db/merge/cnScheduler.go create mode 100644 pkg/vm/engine/tae/db/merge/cnScheduler_test.go create mode 100644 pkg/vm/engine/tae/db/merge/meminfo_darwin.go create mode 100644 pkg/vm/engine/tae/db/merge/meminfo_linux.go delete mode 100644 pkg/vm/engine/tae/db/merge/mod.go delete mode 100644 pkg/vm/engine/tae/db/merge/mod_test.go rename pkg/vm/engine/tae/db/merge/{policyBasic.go => policyGroup.go} (55%) create mode 100644 pkg/vm/engine/tae/db/merge/utils_test.go create mode 100644 pkg/vm/engine/tae/tasks/worker/worker_test.go diff --git a/pkg/common/moerr/cause.go b/pkg/common/moerr/cause.go index e0f25e599bf86..b1ebb38a9726f 100644 --- a/pkg/common/moerr/cause.go +++ b/pkg/common/moerr/cause.go @@ -249,6 +249,7 @@ var ( //pkg/vm/engine/tae/db/merge CauseCleanUpUselessFiles = NewInternalError(context.Background(), "CleanUpUselessFiles") CauseOnObject = NewInternalError(context.Background(), "OnObject") + CauseCreateCNMerge = NewInternalError(context.Background(), "CreateCNMergeTask") //pkg/vm/engine/tae/logstore/driver/logservicedriver CauseDriverAppender1 = NewInternalError(context.Background(), "DriverAppender append 1") CauseDriverAppender2 = NewInternalError(context.Background(), "DriverAppender append 2") diff --git a/pkg/taskservice/mysql_task_storage.go b/pkg/taskservice/mysql_task_storage.go index 9dc580753c6f8..b52eb1b325283 100644 --- a/pkg/taskservice/mysql_task_storage.go +++ b/pkg/taskservice/mysql_task_storage.go @@ -270,7 +270,9 @@ func (m *mysqlTaskStorage) UpdateAsyncTask(ctx context.Context, tasks []task.Asy } if t.ExecuteResult != nil { execResult.Code = t.ExecuteResult.Code - execResult.Error = t.ExecuteResult.Error + if len(execResult.Error) > 1000 { + execResult.Error = execResult.Error[:1000] + } } j, err := json.Marshal(t.Metadata.Options) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 287f7e260a456..e40ba04fcfa61 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -2172,13 +2172,15 @@ func (tbl *txnTable) MergeObjects( sortKeyPos, sortKeyIsPK := tbl.getSortKeyPosAndSortKeyIsPK() - // check object visibility - for _, objstat := range objStats { + // check object visibility and set object stats. + for i, objstat := range objStats { info, exist := state.GetObject(*objstat.ObjectShortName()) if !exist || (!info.DeleteTime.IsEmpty() && info.DeleteTime.LE(&snapshot)) { logutil.Errorf("object not visible: %s", info.String()) return nil, moerr.NewInternalErrorNoCtxf("object %s not exist", objstat.ObjectName().String()) } + objectio.SetObjectStats(&objstat, &info.ObjectStats) + objStats[i] = objstat } tbl.ensureSeqnumsAndTypesExpectRowid() @@ -2192,7 +2194,7 @@ func (tbl *txnTable) MergeObjects( return nil, err } - err = mergesort.DoMergeAndWrite(ctx, tbl.getTxn().op.Txn().DebugString(), sortKeyPos, taskHost, false) + err = mergesort.DoMergeAndWrite(ctx, tbl.getTxn().op.Txn().DebugString(), sortKeyPos, taskHost) if err != nil { taskHost.commitEntry.Err = err.Error() return taskHost.commitEntry, err diff --git a/pkg/vm/engine/tae/catalog/table.go b/pkg/vm/engine/tae/catalog/table.go index 9d010e201d0a4..9d3b09aa6e481 100644 --- a/pkg/vm/engine/tae/catalog/table.go +++ b/pkg/vm/engine/tae/catalog/table.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "math" + "slices" "sync/atomic" pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog" @@ -379,6 +380,7 @@ func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int, isTom needCount = math.MaxInt } + objEntries := make([]*ObjectEntry, 0) for it.Next() { objectEntry := it.Item() if !objectEntry.IsActive() { @@ -392,6 +394,8 @@ func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int, isTom break } needCount-- + objEntries = append(objEntries, objectEntry) + stat.ObjectCnt += 1 if objectEntry.GetLoaded() { stat.Loaded += 1 @@ -399,21 +403,36 @@ func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int, isTom stat.OSize += int(objectEntry.OriginSize()) stat.Csize += int(objectEntry.Size()) } - if level > common.PPL0 { - _ = w.WriteByte('\n') - _, _ = w.WriteString(objectEntry.ID().String()) + } + + slices.SortFunc(objEntries, func(a, b *ObjectEntry) int { + zmA := a.SortKeyZoneMap() + zmB := b.SortKeyZoneMap() + + c := zmA.CompareMin(zmB) + if c != 0 { + return c + } + return zmA.CompareMax(zmB) + }) + + if level > common.PPL0 { + for _, objEntry := range objEntries { _ = w.WriteByte('\n') - _, _ = w.WriteString(" ") - _, _ = w.WriteString(objectEntry.StatsString(zonemapKind)) + _, _ = w.WriteString(objEntry.ID().String()) + _, _ = w.WriteString("\n ") + _, _ = w.WriteString(objEntry.StatsString(zonemapKind)) + + if w.Len() > 8*common.Const1MBytes { + w.WriteString("\n...(truncated for too long, more than 8 MB)") + break + } } - if w.Len() > 8*common.Const1MBytes { - w.WriteString("\n...(truncated for too long, more than 8 MB)") - break + if stat.ObjectCnt > 0 { + w.WriteByte('\n') } } - if level > common.PPL0 && stat.ObjectCnt > 0 { - w.WriteByte('\n') - } + return } @@ -428,8 +447,10 @@ func (entry *TableEntry) ObjectStatsString(level common.PPLevel, start, end int, } summary := fmt.Sprintf( - "summary: %d total, %d unknown, avgRow %d, avgOsize %s, avgCsize %v", - stat.ObjectCnt, stat.ObjectCnt-stat.Loaded, avgRow, common.HumanReadableBytes(avgOsize), common.HumanReadableBytes(avgCsize), + "summary: %d objs, %d unloaded, total orignal size:%s, average orignal size:%s, average rows:%d, average compressed size:%s", + stat.ObjectCnt, stat.ObjectCnt-stat.Loaded, + common.HumanReadableBytes(stat.OSize), common.HumanReadableBytes(avgOsize), + avgRow, common.HumanReadableBytes(avgCsize), ) detail.WriteString(summary) return detail.String() diff --git a/pkg/vm/engine/tae/common/stats.go b/pkg/vm/engine/tae/common/stats.go index 525d14aacad93..453ee917426a3 100644 --- a/pkg/vm/engine/tae/common/stats.go +++ b/pkg/vm/engine/tae/common/stats.go @@ -24,7 +24,7 @@ import ( ) const ( - DefaultMinOsizeQualifiedMB = 110 // MB + DefaultMinOsizeQualifiedMB = 90 // MB DefaultMaxOsizeObjMB = 128 // MB DefaultMinCNMergeSize = 80000 // MB DefaultCNMergeMemControlHint = 8192 // MB diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 6f8d5032021ba..addbd017ac863 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -71,8 +71,6 @@ type DB struct { DBLocker io.Closer - CNMergeSched merge.CNMergeScheduler - Closed *atomic.Value } diff --git a/pkg/vm/engine/tae/db/merge/cnScheduler.go b/pkg/vm/engine/tae/db/merge/cnScheduler.go new file mode 100644 index 0000000000000..703ecf2fa6ad7 --- /dev/null +++ b/pkg/vm/engine/tae/db/merge/cnScheduler.go @@ -0,0 +1,154 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "bytes" + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/api" + taskpb "github.com/matrixorigin/matrixone/pkg/pb/task" + "github.com/matrixorigin/matrixone/pkg/taskservice" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" +) + +func NewTaskServiceGetter(getter taskservice.Getter) *CNMergeScheduler { + return &CNMergeScheduler{ + getter: getter, + activeObjects: struct { + sync.Mutex + o map[objectio.ObjectId]activeEntry + }{o: make(map[objectio.ObjectId]activeEntry)}, + } +} + +type CNMergeScheduler struct { + getter taskservice.Getter + + activeObjects struct { + sync.Mutex + o map[objectio.ObjectId]activeEntry + } +} + +func (s *CNMergeScheduler) sendMergeTask(ctx context.Context, task *api.MergeTaskEntry) error { + ts, ok := s.getter() + if !ok { + return taskservice.ErrNotReady + } + taskIDPrefix := "Merge:" + strconv.Itoa(int(task.TblId)) + asyncTask, err := ts.QueryAsyncTask(ctx, + taskservice.WithTaskMetadataId(taskservice.LIKE, taskIDPrefix+"%"), + taskservice.WithTaskStatusCond(taskpb.TaskStatus_Created, taskpb.TaskStatus_Running)) + if err != nil { + return err + } + if len(asyncTask) != 0 { + return moerr.NewInternalError(ctx, fmt.Sprintf("table %q is merging", task.TableName)) + } + b, err := task.Marshal() + if err != nil { + return err + } + return ts.CreateAsyncTask(ctx, + taskpb.TaskMetadata{ + ID: taskIDPrefix + ":" + strconv.FormatInt(time.Now().Unix(), 10), + Executor: taskpb.TaskCode_MergeObject, + Context: b, + Options: taskpb.TaskOptions{Resource: &taskpb.Resource{Memory: task.EstimatedMemUsage}}, + }) +} + +func (s *CNMergeScheduler) addActiveObjects(entries []*catalog.ObjectEntry) { + s.activeObjects.Lock() + for _, entry := range entries { + s.activeObjects.o[*entry.ID()] = activeEntry{ + entry.GetTable().ID, + time.Now(), + } + } + s.activeObjects.Unlock() +} + +func (s *CNMergeScheduler) checkOverlapOnCNActive(entries []*catalog.ObjectEntry) bool { + s.activeObjects.Lock() + defer s.activeObjects.Unlock() + for _, entry := range entries { + if _, ok := s.activeObjects.o[*entry.ID()]; ok { + return true + } + } + return false +} + +func (s *CNMergeScheduler) activeObjsString() string { + s.activeObjects.Lock() + defer s.activeObjects.Unlock() + + b := &bytes.Buffer{} + now := time.Now() + for k, v := range s.activeObjects.o { + b.WriteString(fmt.Sprintf(" id: %v, table: %v, insertAt: %s ago\n", + k.String(), v.tid, now.Sub(v.insertAt).String())) + } + return b.String() +} + +func (s *CNMergeScheduler) removeActiveObject(ids []objectio.ObjectId) { + s.activeObjects.Lock() + defer s.activeObjects.Unlock() + for _, id := range ids { + delete(s.activeObjects.o, id) + } +} + +func (s *CNMergeScheduler) prune(id uint64, ago time.Duration) { + s.activeObjects.Lock() + defer s.activeObjects.Unlock() + now := time.Now() + if ago == 0 { + for k, v := range s.activeObjects.o { + if v.tid == id { + delete(s.activeObjects.o, k) + } + } + return + } + + if id == 0 && ago > 1*time.Second { + for k, v := range s.activeObjects.o { + if now.Sub(v.insertAt) > ago { + delete(s.activeObjects.o, k) + } + } + return + } + for k, v := range s.activeObjects.o { + if v.tid == id && now.Sub(v.insertAt) > ago { + delete(s.activeObjects.o, k) + } + } +} + +type activeEntry struct { + tid uint64 + insertAt time.Time +} diff --git a/pkg/vm/engine/tae/db/merge/cnScheduler_test.go b/pkg/vm/engine/tae/db/merge/cnScheduler_test.go new file mode 100644 index 0000000000000..1a947dc4492ef --- /dev/null +++ b/pkg/vm/engine/tae/db/merge/cnScheduler_test.go @@ -0,0 +1,117 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/taskservice" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase" + "github.com/stretchr/testify/require" + "slices" + "testing" + "time" +) + +func TestScheduler_CNActiveObjectsString(t *testing.T) { + memStorage := taskservice.NewMemTaskStorage() + cnScheduler := NewTaskServiceGetter(func() (taskservice.TaskService, bool) { + return taskservice.NewTaskService(runtime.DefaultRuntime(), memStorage), true + }) + + cata := catalog.MockCatalog() + defer cata.Close() + txnMgr := txnbase.NewTxnManager(catalog.MockTxnStoreFactory(cata), catalog.MockTxnFactory(cata), types.NewMockHLCClock(1)) + txnMgr.Start(context.Background()) + defer txnMgr.Stop() + txn1, _ := txnMgr.StartTxn(nil) + db, err := cata.CreateDBEntry("db", "", "", txn1) + require.NoError(t, err) + catalog.MockSchema(1, 0) + tbl, err := db.CreateTableEntry(catalog.MockSchema(1, 0), txn1, nil) + require.NoError(t, err) + require.NoError(t, txn1.Commit(context.Background())) + schema := tbl.GetLastestSchema(false) + + txn2, _ := txnMgr.StartTxn(nil) + entry := newSortedDataEntryWithTableEntry(t, tbl, txn2, 0, 1, overlapSizeThreshold) + stat := *entry.GetObjectStats() + cnScheduler.addActiveObjects([]*catalog.ObjectEntry{entry}) + require.NotEmpty(t, cnScheduler.activeObjsString()) + + cnScheduler.removeActiveObject([]objectio.ObjectId{*entry.ID()}) + require.Empty(t, cnScheduler.activeObjsString()) + + taskEntry := &api.MergeTaskEntry{ + AccountId: schema.AcInfo.TenantID, + UserId: schema.AcInfo.UserID, + RoleId: schema.AcInfo.RoleID, + TblId: tbl.ID, + DbId: tbl.GetDB().GetID(), + TableName: tbl.GetLastestSchema(false).Name, + DbName: tbl.GetDB().GetName(), + ToMergeObjs: [][]byte{stat[:]}, + } + + require.NoError(t, cnScheduler.sendMergeTask(context.Background(), taskEntry)) + tasks, err := memStorage.QueryAsyncTask(context.Background()) + if err != nil { + return + } + require.Equal(t, 1, len(tasks)) + meta := new(api.MergeTaskEntry) + require.NoError(t, meta.Unmarshal(tasks[0].Metadata.Context)) + require.Equal(t, meta.DbName, tbl.GetDB().GetName()) + require.Error(t, cnScheduler.sendMergeTask(context.Background(), taskEntry)) +} + +func TestExecutorCNMerge(t *testing.T) { + + cata := catalog.MockCatalog() + defer cata.Close() + txnMgr := txnbase.NewTxnManager(catalog.MockTxnStoreFactory(cata), catalog.MockTxnFactory(cata), types.NewMockHLCClock(1)) + txnMgr.Start(context.Background()) + defer txnMgr.Stop() + txn1, _ := txnMgr.StartTxn(nil) + db, err := cata.CreateDBEntry("db", "", "", txn1) + require.NoError(t, err) + catalog.MockSchema(1, 0) + tbl, err := db.CreateTableEntry(catalog.MockSchema(1, 0), txn1, nil) + require.NoError(t, err) + require.NoError(t, txn1.Commit(context.Background())) + + txn2, _ := txnMgr.StartTxn(nil) + entry := newSortedDataEntryWithTableEntry(t, tbl, txn2, 0, 1, overlapSizeThreshold) + + memStorage := taskservice.NewMemTaskStorage() + cnScheduler := NewTaskServiceGetter(func() (taskservice.TaskService, bool) { + return taskservice.NewTaskService(runtime.DefaultRuntime(), memStorage), true + }) + executor := newMergeExecutor(&dbutils.Runtime{}, cnScheduler) + executor.executeFor(tbl, []*catalog.ObjectEntry{entry}, taskHostCN) + require.NotEmpty(t, cnScheduler.activeObjsString()) + + executor.executeFor(tbl, []*catalog.ObjectEntry{entry}, taskHostCN) + entry2 := newSortedDataEntryWithTableEntry(t, tbl, txn2, 0, 1, overlapSizeThreshold) + executor.executeFor(tbl, slices.Repeat([]*catalog.ObjectEntry{entry2}, 31), taskHostCN) + + executor.cnSched.prune(0, 0) + executor.cnSched.prune(0, time.Hour) +} diff --git a/pkg/vm/engine/tae/db/merge/config.go b/pkg/vm/engine/tae/db/merge/config.go index 75c0a96b80296..5b54dd19b38bb 100644 --- a/pkg/vm/engine/tae/db/merge/config.go +++ b/pkg/vm/engine/tae/db/merge/config.go @@ -28,8 +28,7 @@ import ( ) var ( - _ policy = (*basic)(nil) - defaultBasicConfig = &BasicPolicyConfig{ + defaultBasicConfig = &BasicPolicyConfig{ MergeMaxOneRun: common.DefaultMaxMergeObjN, MaxOsizeMergedObj: common.DefaultMaxOsizeObjMB * common.Const1MBytes, ObjectMinOsize: common.DefaultMinOsizeQualifiedMB * common.Const1MBytes, diff --git a/pkg/vm/engine/tae/db/merge/config_test.go b/pkg/vm/engine/tae/db/merge/config_test.go index eea9882592b6b..8b1cf5cd4f0fe 100644 --- a/pkg/vm/engine/tae/db/merge/config_test.go +++ b/pkg/vm/engine/tae/db/merge/config_test.go @@ -35,7 +35,7 @@ func newTestSchema(colCnt, pkIdx int, config *BasicPolicyConfig) *catalog.Schema } func TestString(t *testing.T) { - require.Equal(t, "minOsizeObj:110MiB, maxOneRun:16, maxOsizeMergedObj: 128MiB, offloadToCNSize:78.12GiB, hints: []", defaultBasicConfig.String()) + require.Equal(t, "minOsizeObj:90MiB, maxOneRun:16, maxOsizeMergedObj: 128MiB, offloadToCNSize:78.12GiB, hints: []", defaultBasicConfig.String()) } func TestConfigForTable(t *testing.T) { @@ -77,5 +77,5 @@ func TestConfigForTable(t *testing.T) { config = configProvider.getConfig(tbl3) require.Equal(t, defaultBasicConfig.MaxOsizeMergedObj, config.MaxOsizeMergedObj) - require.Equal(t, "customConfigProvider: 0-:115343360,2 | ", configProvider.String()) + require.Equal(t, "customConfigProvider: 0-:94371840,2 | ", configProvider.String()) } diff --git a/pkg/vm/engine/tae/db/merge/executor.go b/pkg/vm/engine/tae/db/merge/executor.go index ab1e44ba60587..34ac1fe1a70d2 100644 --- a/pkg/vm/engine/tae/db/merge/executor.go +++ b/pkg/vm/engine/tae/db/merge/executor.go @@ -15,237 +15,111 @@ package merge import ( - "bytes" "context" "errors" "fmt" - "math" - "os" - "sync" - "sync/atomic" + "slices" + "time" - "github.com/KimMachineGun/automemlimit/memlimit" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" - "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" - v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - "github.com/shirou/gopsutil/v3/cpu" - "github.com/shirou/gopsutil/v3/mem" - "github.com/shirou/gopsutil/v3/process" ) -type activeTaskStats map[uint64]struct { - blk int - estBytes int -} - // executor consider resources to decide to merge or not. type executor struct { - tableName string - rt *dbutils.Runtime - cnSched CNMergeScheduler - memLimit int - memUsing int - transPageLimit uint64 - cpuPercent float64 - activeMergeBlkCount atomic.Int32 - activeEstimateBytes atomic.Int64 - roundMergeRows uint64 - taskConsume struct { - sync.Mutex - o map[objectio.ObjectId]struct{} - m activeTaskStats - } + rt *dbutils.Runtime + cnSched *CNMergeScheduler } -func newMergeExecutor(rt *dbutils.Runtime, sched CNMergeScheduler) *executor { +func newMergeExecutor(rt *dbutils.Runtime, sched *CNMergeScheduler) *executor { return &executor{ rt: rt, cnSched: sched, } } -func (e *executor) setMemLimit(total uint64) { - containerMLimit, err := memlimit.FromCgroup() - if containerMLimit > 0 && containerMLimit < total { - e.memLimit = int(float64(containerMLimit) * 0.9) - } else { - e.memLimit = int(float64(total) * 0.9) - } - - if e.memLimit > 200*common.Const1GBytes { - e.transPageLimit = uint64(e.memLimit / 25 * 2) // 8% - } else if e.memLimit > 100*common.Const1GBytes { - e.transPageLimit = uint64(e.memLimit / 25 * 3) // 12% - } else if e.memLimit > 40*common.Const1GBytes { - e.transPageLimit = uint64(e.memLimit / 25 * 4) // 16% - } else { - e.transPageLimit = math.MaxUint64 // no limit +func (e *executor) executeFor(entry *catalog.TableEntry, objs []*catalog.ObjectEntry, kind taskHostKind) { + if len(objs) == 0 { + return } - - logutil.Info( - "MergeExecutorMemoryInfo", - common.AnyField("container-limit", common.HumanReadableBytes(int(containerMLimit))), - common.AnyField("host-memory", common.HumanReadableBytes(int(total))), - common.AnyField("process-limit", common.HumanReadableBytes(e.memLimit)), - common.AnyField("transfer-page-limit", common.HumanReadableBytes(int(e.transPageLimit))), - common.AnyField("error", err), - ) -} - -var proc *process.Process - -func (e *executor) refreshMemInfo() { - if proc == nil { - proc, _ = process.NewProcess(int32(os.Getpid())) - } else if mem, err := proc.MemoryInfo(); err == nil { - e.memUsing = int(mem.RSS) + // check objects are merging by CNs. + if e.cnSched.checkOverlapOnCNActive(objs) { + return } - if e.memLimit == 0 { - if stats, err := mem.VirtualMemory(); err == nil { - e.setMemLimit(stats.Total) + isTombstone := objs[0].IsTombstone + for _, o := range objs { + if o.IsTombstone != isTombstone { + panic("merging tombstone and data objects in one merge") } } - if percents, err := cpu.Percent(0, false); err == nil { - e.cpuPercent = percents[0] - } - e.roundMergeRows = 0 -} - -func (e *executor) printStats() { - cnt := e.activeMergeBlkCount.Load() - if cnt == 0 && e.memAvailBytes() > 512*common.Const1MBytes { + if kind == taskHostDN { + e.scheduleMergeObjects(slices.Clone(objs), entry, isTombstone) return } - logutil.Info( - "MergeExecutorMemoryStats", - common.AnyField("process-limit", common.HumanReadableBytes(e.memLimit)), - common.AnyField("process-mem", common.HumanReadableBytes(e.memUsing)), - common.AnyField("inuse-mem", common.HumanReadableBytes(int(e.activeEstimateBytes.Load()))), - common.AnyField("inuse-cnt", cnt), - ) -} - -func (e *executor) addActiveTask(taskId uint64, blkn, esize int) { - e.activeEstimateBytes.Add(int64(esize)) - e.activeMergeBlkCount.Add(int32(blkn)) - e.taskConsume.Lock() - if e.taskConsume.m == nil { - e.taskConsume.m = make(activeTaskStats) + // prevent CN OOM + if len(objs) > 30 { + objs = objs[:30] } - e.taskConsume.m[taskId] = struct { - blk int - estBytes int - }{blkn, esize} - e.taskConsume.Unlock() -} -func (e *executor) OnExecDone(v any) { - task := v.(tasks.MScopedTask) - - e.taskConsume.Lock() - stat := e.taskConsume.m[task.ID()] - delete(e.taskConsume.m, task.ID()) - e.taskConsume.Unlock() - - e.activeMergeBlkCount.Add(-int32(stat.blk)) - e.activeEstimateBytes.Add(-int64(stat.estBytes)) -} - -func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) { - if e.roundMergeRows*36 /*28 * 1.3 */ > e.transPageLimit/8 { + stats := make([][]byte, 0, len(objs)) + cids := make([]common.ID, 0, len(objs)) + for _, obj := range objs { + stat := *obj.GetObjectStats() + stats = append(stats, stat[:]) + cids = append(cids, *obj.AsCommonID()) + } + // check objects are merging by TN. + if e.rt.Scheduler != nil && e.rt.Scheduler.CheckAsyncScopes(cids) != nil { return } - e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema(false).Name) - - if ActiveCNObj.CheckOverlapOnCNActive(mobjs) { + schema := entry.GetLastestSchema(false) + cntask := &api.MergeTaskEntry{ + AccountId: schema.AcInfo.TenantID, + UserId: schema.AcInfo.UserID, + RoleId: schema.AcInfo.RoleID, + TblId: entry.ID, + DbId: entry.GetDB().GetID(), + TableName: entry.GetLastestSchema(isTombstone).Name, + DbName: entry.GetDB().GetName(), + ToMergeObjs: stats, + EstimatedMemUsage: uint64(estimateMergeSize(objs)), + } + ctx, cancel := context.WithTimeoutCause(context.Background(), 10*time.Second, moerr.CauseCreateCNMerge) + defer cancel() + err := e.cnSched.sendMergeTask(ctx, cntask) + if err != nil { + logutil.Info("MergeExecutorError", + common.OperationField("send-cn-task"), + common.AnyField("task", fmt.Sprintf("table-%d-%s", cntask.TblId, cntask.TableName)), + common.AnyField("error", err), + ) return } - if kind == TaskHostCN { - osize, esize := estimateMergeConsume(mobjs) - blkCnt := 0 - for _, obj := range mobjs { - blkCnt += obj.BlockCnt() - } - stats := make([][]byte, 0, len(mobjs)) - cids := make([]common.ID, 0, len(mobjs)) - for _, obj := range mobjs { - stat := *obj.GetObjectStats() - stats = append(stats, stat[:]) - cids = append(cids, *obj.AsCommonID()) - } - if e.rt.Scheduler.CheckAsyncScopes(cids) != nil { - return - } - schema := entry.GetLastestSchema(false) - cntask := &api.MergeTaskEntry{ - AccountId: schema.AcInfo.TenantID, - UserId: schema.AcInfo.UserID, - RoleId: schema.AcInfo.RoleID, - TblId: entry.ID, - DbId: entry.GetDB().GetID(), - TableName: entry.GetLastestSchema(false).Name, - DbName: entry.GetDB().GetName(), - ToMergeObjs: stats, - EstimatedMemUsage: uint64(esize), - } - if err := e.cnSched.SendMergeTask(context.TODO(), cntask); err == nil { - ActiveCNObj.AddActiveCNObj(mobjs) - logMergeTask(e.tableName, math.MaxUint64, mobjs, blkCnt, osize, esize) - } else { - logutil.Info( - "MergeExecutorError", - common.OperationField("send-cn-task"), - common.AnyField("task", fmt.Sprintf("table-%d-%s", cntask.TblId, cntask.TableName)), - common.AnyField("error", err), - ) - return - } - entry.Stats.SetLastMergeTime() - } else { - objScopes := make([]common.ID, 0) - tombstoneScopes := make([]common.ID, 0) - objs := make([]*catalog.ObjectEntry, 0) - tombstones := make([]*catalog.ObjectEntry, 0) - objectBlkCnt := 0 - tombstoneBlkCnt := 0 - for _, obj := range mobjs { - if obj.IsTombstone { - tombstoneBlkCnt += obj.BlockCnt() - tombstones = append(tombstones, obj) - tombstoneScopes = append(tombstoneScopes, *obj.AsCommonID()) - } else { - objectBlkCnt += obj.BlockCnt() - objs = append(objs, obj) - objScopes = append(objScopes, *obj.AsCommonID()) - } - } + e.cnSched.addActiveObjects(objs) + entry.Stats.SetLastMergeTime() +} - if len(objs) > 0 { - e.scheduleMergeObjects(objScopes, objs, objectBlkCnt, entry, false) - } - if len(tombstones) > 1 { - e.scheduleMergeObjects(tombstoneScopes, tombstones, tombstoneBlkCnt, entry, true) - } +func (e *executor) scheduleMergeObjects(mObjs []*catalog.ObjectEntry, entry *catalog.TableEntry, isTombstone bool) { + scopes := make([]common.ID, 0, len(mObjs)) + for _, obj := range mObjs { + scopes = append(scopes, *obj.AsCommonID()) } -} -func (e *executor) scheduleMergeObjects(scopes []common.ID, mobjs []*catalog.ObjectEntry, blkCnt int, entry *catalog.TableEntry, isTombstone bool) { - osize, esize := estimateMergeConsume(mobjs) factory := func(ctx *tasks.Context, txn txnif.AsyncTxn) (tasks.Task, error) { txn.GetMemo().IsFlushOrMerge = true - return jobs.NewMergeObjectsTask(ctx, txn, mobjs, e.rt, common.DefaultMaxOsizeObjMB*common.Const1MBytes, isTombstone) + return jobs.NewMergeObjectsTask(ctx, txn, mObjs, e.rt, common.DefaultMaxOsizeObjMB*common.Const1MBytes, isTombstone) } - task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTaskWithObserver(nil, tasks.DataCompactionTask, scopes, factory, e) + task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTask(nil, tasks.DataCompactionTask, scopes, factory) if err != nil { if !errors.Is(err, tasks.ErrScheduleScopeConflict) { logutil.Info( @@ -257,58 +131,5 @@ func (e *executor) scheduleMergeObjects(scopes []common.ID, mobjs []*catalog.Obj } return } - e.addActiveTask(task.ID(), blkCnt, esize) - for _, obj := range mobjs { - e.roundMergeRows += uint64(obj.Rows()) - } - logMergeTask(e.tableName, task.ID(), mobjs, blkCnt, osize, esize) entry.Stats.SetLastMergeTime() } - -func (e *executor) memAvailBytes() int { - merging := int(e.activeEstimateBytes.Load()) - avail := e.memLimit - e.memUsing - merging - if avail < 0 { - avail = 0 - } - return avail -} - -func (e *executor) transferPageSizeLimit() uint64 { - return e.transPageLimit -} - -func (e *executor) CPUPercent() int64 { - return int64(e.cpuPercent) -} - -func logMergeTask(name string, taskId uint64, merges []*catalog.ObjectEntry, blkn, osize, esize int) { - rows := 0 - infoBuf := &bytes.Buffer{} - for _, obj := range merges { - r := int(obj.Rows()) - rows += r - infoBuf.WriteString(fmt.Sprintf(" %d(%s)", r, obj.ID().ShortStringEx())) - } - platform := fmt.Sprintf("t%d", taskId) - if taskId == math.MaxUint64 { - platform = "CN" - v2.TaskCNMergeScheduledByCounter.Inc() - v2.TaskCNMergedSizeCounter.Add(float64(osize)) - } else { - v2.TaskDNMergeScheduledByCounter.Inc() - v2.TaskDNMergedSizeCounter.Add(float64(osize)) - } - logutil.Info( - "MergeExecutor", - common.OperationField("schedule-merge-task"), - common.AnyField("name", name), - common.AnyField("platform", platform), - common.AnyField("num-obj", len(merges)), - common.AnyField("num-blk", blkn), - common.AnyField("orig-size", common.HumanReadableBytes(osize)), - common.AnyField("est-size", common.HumanReadableBytes(esize)), - common.AnyField("rows", rows), - common.AnyField("info", infoBuf.String()), - ) -} diff --git a/pkg/vm/engine/tae/db/merge/meminfo_darwin.go b/pkg/vm/engine/tae/db/merge/meminfo_darwin.go new file mode 100644 index 0000000000000..6d7dee835ef34 --- /dev/null +++ b/pkg/vm/engine/tae/db/merge/meminfo_darwin.go @@ -0,0 +1,33 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "syscall" + "unsafe" +) + +func totalMem() uint64 { + s, err := syscall.Sysctl("hw.memsize") + if err != nil { + return 0 + } + // hack because the string conversion above drops a \0 + b := []byte(s) + if len(b) < 8 { + b = append(b, 0) + } + return *(*uint64)(unsafe.Pointer(&b[0])) +} diff --git a/pkg/vm/engine/tae/db/merge/meminfo_linux.go b/pkg/vm/engine/tae/db/merge/meminfo_linux.go new file mode 100644 index 0000000000000..57a10daf214ac --- /dev/null +++ b/pkg/vm/engine/tae/db/merge/meminfo_linux.go @@ -0,0 +1,26 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import "syscall" + +func totalMem() uint64 { + in := new(syscall.Sysinfo_t) + err := syscall.Sysinfo(in) + if err != nil { + return 0 + } + return in.Totalram * uint64(in.Unit) +} diff --git a/pkg/vm/engine/tae/db/merge/mod.go b/pkg/vm/engine/tae/db/merge/mod.go deleted file mode 100644 index 6c72813f1a251..0000000000000 --- a/pkg/vm/engine/tae/db/merge/mod.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2023 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package merge - -import ( - "bytes" - "context" - "fmt" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/fileservice" - "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/pb/api" - taskpb "github.com/matrixorigin/matrixone/pkg/pb/task" - "github.com/matrixorigin/matrixone/pkg/taskservice" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" -) - -var StopMerge atomic.Bool - -type CNMergeScheduler interface { - SendMergeTask(ctx context.Context, task *api.MergeTaskEntry) error -} - -func NewTaskServiceGetter(getter taskservice.Getter) CNMergeScheduler { - return &taskServiceGetter{ - Getter: getter, - } -} - -type taskServiceGetter struct { - taskservice.Getter -} - -func (tsg *taskServiceGetter) SendMergeTask(ctx context.Context, task *api.MergeTaskEntry) error { - ts, ok := tsg.Getter() - if !ok { - return taskservice.ErrNotReady - } - taskIDPrefix := "Merge:" + task.TableName - asyncTask, err := ts.QueryAsyncTask(ctx, - taskservice.WithTaskMetadataId(taskservice.LIKE, taskIDPrefix+"%"), - taskservice.WithTaskStatusCond(taskpb.TaskStatus_Created, taskpb.TaskStatus_Running)) - if err != nil { - return err - } - if len(asyncTask) != 0 { - return moerr.NewInternalError(ctx, fmt.Sprintf("table %q is merging", task.TableName)) - } - b, err := task.Marshal() - if err != nil { - return err - } - return ts.CreateAsyncTask(ctx, - taskpb.TaskMetadata{ - ID: taskIDPrefix + ":" + strconv.FormatInt(time.Now().Unix(), 10), - Executor: taskpb.TaskCode_MergeObject, - Context: b, - Options: taskpb.TaskOptions{Resource: &taskpb.Resource{Memory: task.EstimatedMemUsage}}, - }) -} - -type TaskHostKind int - -const ( - TaskHostCN TaskHostKind = iota - TaskHostDN -) - -type activeEntry struct { - tid uint64 - insertAt time.Time -} - -var ActiveCNObj = ActiveCNObjMap{ - o: make(map[objectio.ObjectId]activeEntry), -} - -type ActiveCNObjMap struct { - sync.Mutex - o map[objectio.ObjectId]activeEntry -} - -func (e *ActiveCNObjMap) Prune(id uint64, ago time.Duration) { - e.Lock() - defer e.Unlock() - now := time.Now() - if ago == 0 { - for k, v := range e.o { - if v.tid == id { - delete(e.o, k) - } - } - return - } - - if id == 0 && ago > 1*time.Second { - for k, v := range e.o { - if now.Sub(v.insertAt) > ago { - delete(e.o, k) - } - } - return - } - for k, v := range e.o { - if v.tid == id && now.Sub(v.insertAt) > ago { - delete(e.o, k) - } - } -} - -func (e *ActiveCNObjMap) String() string { - e.Lock() - defer e.Unlock() - - b := &bytes.Buffer{} - now := time.Now() - for k, v := range e.o { - b.WriteString(fmt.Sprintf(" id: %v, table: %v, insertAt: %s ago\n", - k.String(), v.tid, now.Sub(v.insertAt).String())) - } - return b.String() -} - -func (e *ActiveCNObjMap) AddActiveCNObj(entries []*catalog.ObjectEntry) { - e.Lock() - for _, entry := range entries { - e.o[*entry.ID()] = activeEntry{ - entry.GetTable().ID, - time.Now(), - } - } - e.Unlock() -} - -func (e *ActiveCNObjMap) RemoveActiveCNObj(ids []objectio.ObjectId) { - e.Lock() - defer e.Unlock() - for _, id := range ids { - delete(e.o, id) - } -} - -func (e *ActiveCNObjMap) CheckOverlapOnCNActive(entries []*catalog.ObjectEntry) bool { - e.Lock() - defer e.Unlock() - for _, entry := range entries { - if _, ok := e.o[*entry.ID()]; ok { - return true - } - } - return false -} - -func CleanUpUselessFiles(entry *api.MergeCommitEntry, fs fileservice.FileService) { - if entry == nil { - return - } - ctx, cancel := context.WithTimeoutCause(context.Background(), 2*time.Minute, moerr.CauseCleanUpUselessFiles) - defer cancel() - for _, filepath := range entry.BookingLoc { - _ = fs.Delete(ctx, filepath) - } - if len(entry.CreatedObjs) != 0 { - for _, obj := range entry.CreatedObjs { - if len(obj) == 0 { - continue - } - s := objectio.ObjectStats(obj) - _ = fs.Delete(ctx, s.ObjectName().String()) - } - } -} - -const ( - constMaxMemCap = 12 * common.Const1GBytes // max original memory for an object - constSmallMergeGap = 3 * time.Minute -) - -type policy interface { - onObject(*catalog.ObjectEntry, *BasicPolicyConfig) bool - revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult - resetForTable(*catalog.TableEntry) -} - -func NewUpdatePolicyReq(c *BasicPolicyConfig) *api.AlterTableReq { - return &api.AlterTableReq{ - Kind: api.AlterKind_UpdatePolicy, - Operation: &api.AlterTableReq_UpdatePolicy{ - UpdatePolicy: &api.AlterTablePolicy{ - MinOsizeQuailifed: c.ObjectMinOsize, - MaxObjOnerun: uint32(c.MergeMaxOneRun), - MaxOsizeMergedObj: c.MaxOsizeMergedObj, - MinCnMergeSize: c.MinCNMergeSize, - Hints: c.MergeHints, - }, - }, - } -} diff --git a/pkg/vm/engine/tae/db/merge/mod_test.go b/pkg/vm/engine/tae/db/merge/mod_test.go deleted file mode 100644 index db0d647588de5..0000000000000 --- a/pkg/vm/engine/tae/db/merge/mod_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2024 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package merge - -import ( - "context" - "os" - "path" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/matrixorigin/matrixone/pkg/defines" - "github.com/matrixorigin/matrixone/pkg/fileservice" - "github.com/matrixorigin/matrixone/pkg/pb/api" -) - -func Test_CleanUpUselessFiles(t *testing.T) { - tDir := os.TempDir() - dir := path.Join(tDir, "/local") - assert.NoError(t, os.RemoveAll(dir)) - defer func() { - _ = os.RemoveAll(dir) - }() - - c := fileservice.Config{ - Name: defines.ETLFileServiceName, - Backend: "DISK", - DataDir: dir, - Cache: fileservice.DisabledCacheConfig, - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - fs, err := fileservice.NewFileService(ctx, c, nil) - assert.Nil(t, err) - defer fs.Close(ctx) - - ent := &api.MergeCommitEntry{ - BookingLoc: []string{"abc"}, - } - - CleanUpUselessFiles(ent, fs) -} diff --git a/pkg/vm/engine/tae/db/merge/policyCompact.go b/pkg/vm/engine/tae/db/merge/policyCompact.go index f7719bf252d49..2e9db4e65cc6b 100644 --- a/pkg/vm/engine/tae/db/merge/policyCompact.go +++ b/pkg/vm/engine/tae/db/merge/policyCompact.go @@ -16,39 +16,28 @@ package merge import ( "context" + "sort" "time" - "go.uber.org/zap" - - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/fileservice" - "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" ) +var _ policy = (*objCompactPolicy)(nil) + type objCompactPolicy struct { tblEntry *catalog.TableEntry - objects []*catalog.ObjectEntry fs fileservice.FileService - tombstones []*catalog.ObjectEntry - tombstoneStats []objectio.ObjectStats + objects []*catalog.ObjectEntry - validTombstones map[*catalog.ObjectEntry]struct{} + tombstoneMetas []objectio.ObjectDataMeta } func newObjCompactPolicy(fs fileservice.FileService) *objCompactPolicy { - return &objCompactPolicy{ - objects: make([]*catalog.ObjectEntry, 0), - fs: fs, - - tombstones: make([]*catalog.ObjectEntry, 0), - tombstoneStats: make([]objectio.ObjectStats, 0), - validTombstones: make(map[*catalog.ObjectEntry]struct{}), - } + return &objCompactPolicy{fs: fs} } func (o *objCompactPolicy) onObject(entry *catalog.ObjectEntry, config *BasicPolicyConfig) bool { @@ -61,60 +50,34 @@ func (o *objCompactPolicy) onObject(entry *catalog.ObjectEntry, config *BasicPol if entry.OriginSize() < config.ObjectMinOsize { return false } - if len(o.tombstoneStats) == 0 { - return false - } - ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second*10, moerr.CauseOnObject) - sels, err := blockio.FindTombstonesOfObject(ctx, entry.ID(), o.tombstoneStats, o.fs) - cancel() - if err != nil { - return false - } - iter := sels.Iterator() - for iter.HasNext() { - i := iter.Next() - tombstone := o.tombstones[i] - - o.validTombstones[tombstone] = struct{}{} - - if (entryOutdated(tombstone, config.TombstoneLifetime) && tombstone.OriginSize() > 10*common.Const1MBytes) || - tombstone.OriginSize() > common.DefaultMinOsizeQualifiedMB*common.Const1MBytes { - logutil.Info("[MERGE-POLICY]", - zap.String("policy", "compact"), - zap.String("table", o.tblEntry.GetFullName()), - zap.String("data object", entry.ID().ShortStringEx()), - zap.Uint32("data rows", entry.Rows()), - zap.Uint32("tombstone size", tombstone.OriginSize()), - ) - o.objects = append(o.objects, entry) - return true + for _, meta := range o.tombstoneMetas { + if !checkTombstoneMeta(meta, entry.ID()) { + continue } + o.objects = append(o.objects, entry) } return false } -func (o *objCompactPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { +func (o *objCompactPolicy) revise(rc *resourceController) []reviseResult { if o.tblEntry == nil { return nil } - o.filterValidTombstones() - results := make([]reviseResult, 0, len(o.objects)+len(o.tombstones)) + results := make([]reviseResult, 0, len(o.objects)) for _, obj := range o.objects { - results = append(results, reviseResult{[]*catalog.ObjectEntry{obj}, TaskHostDN}) - } - if len(o.tombstoneStats) > 0 { - results = append(results, reviseResult{o.tombstones, TaskHostDN}) + if rc.resourceAvailable([]*catalog.ObjectEntry{obj}) { + rc.reserveResources([]*catalog.ObjectEntry{obj}) + results = append(results, reviseResult{[]*catalog.ObjectEntry{obj}, taskHostDN}) + } } return results } -func (o *objCompactPolicy) resetForTable(entry *catalog.TableEntry) { +func (o *objCompactPolicy) resetForTable(entry *catalog.TableEntry, config *BasicPolicyConfig) { o.tblEntry = entry + o.tombstoneMetas = o.tombstoneMetas[:0] o.objects = o.objects[:0] - o.tombstones = o.tombstones[:0] - o.tombstoneStats = o.tombstoneStats[:0] - clear(o.validTombstones) tIter := entry.MakeTombstoneObjectIt() for tIter.Next() { @@ -124,21 +87,50 @@ func (o *objCompactPolicy) resetForTable(entry *catalog.TableEntry) { continue } - o.tombstones = append(o.tombstones, tEntry) - o.tombstoneStats = append(o.tombstoneStats, *tEntry.GetObjectStats()) + if tEntry.OriginSize() > common.DefaultMaxOsizeObjMB*common.Const1MBytes { + meta, err := loadTombstoneMeta(tEntry.GetObjectStats(), o.fs) + if err != nil { + continue + } + o.tombstoneMetas = append(o.tombstoneMetas, meta) + } } } -func (o *objCompactPolicy) filterValidTombstones() { - i := 0 - for _, x := range o.tombstones { - if _, ok := o.validTombstones[x]; !ok { - o.tombstones[i] = x - i++ - } +func loadTombstoneMeta(tombstoneObject *objectio.ObjectStats, fs fileservice.FileService) (objectio.ObjectDataMeta, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + location := tombstoneObject.ObjectLocation() + objMeta, err := objectio.FastLoadObjectMeta( + ctx, &location, false, fs, + ) + if err != nil { + return nil, err } - for j := i; j < len(o.tombstones); j++ { - o.tombstones[j] = nil + return objMeta.MustDataMeta(), nil +} + +func checkTombstoneMeta(tombstoneMeta objectio.ObjectDataMeta, objectId *objectio.ObjectId) bool { + prefixPattern := objectId[:] + blkCnt := int(tombstoneMeta.BlockCount()) + + startIdx := sort.Search(blkCnt, func(i int) bool { + return tombstoneMeta.GetBlockMeta(uint32(i)).MustGetColumn(0).ZoneMap().AnyGEByValue(prefixPattern) + }) + + for pos := startIdx; pos < blkCnt; pos++ { + blkMeta := tombstoneMeta.GetBlockMeta(uint32(pos)) + columnZonemap := blkMeta.MustGetColumn(0).ZoneMap() + // block id is the prefixPattern of the rowid and zonemap is min-max of rowid + // !PrefixEq means there is no rowid of this block in this zonemap, so skip + if columnZonemap.RowidPrefixEq(prefixPattern) { + return true + } + if columnZonemap.RowidPrefixGT(prefixPattern) { + // all zone maps are sorted by the rowid + // if the block id is less than the prefixPattern of the min rowid, skip the rest blocks + break + } } - o.tombstones = o.tombstones[:i] + return false } diff --git a/pkg/vm/engine/tae/db/merge/policyBasic.go b/pkg/vm/engine/tae/db/merge/policyGroup.go similarity index 55% rename from pkg/vm/engine/tae/db/merge/policyBasic.go rename to pkg/vm/engine/tae/db/merge/policyGroup.go index c3ac00c397ae0..d90e7acc56004 100644 --- a/pkg/vm/engine/tae/db/merge/policyBasic.go +++ b/pkg/vm/engine/tae/db/merge/policyGroup.go @@ -15,10 +15,7 @@ package merge import ( - "cmp" "context" - "slices" - "time" pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -32,7 +29,7 @@ import ( type reviseResult struct { objs []*catalog.ObjectEntry - kind TaskHostKind + kind taskHostKind } type policyGroup struct { @@ -57,10 +54,10 @@ func (g *policyGroup) onObject(obj *catalog.ObjectEntry) { } } -func (g *policyGroup) revise(cpu, mem int64) []reviseResult { +func (g *policyGroup) revise(rc *resourceController) []reviseResult { results := make([]reviseResult, 0, len(g.policies)) for _, p := range g.policies { - pResult := p.revise(cpu, mem, g.config) + pResult := p.revise(rc) for _, r := range pResult { if len(r.objs) > 0 { results = append(results, r) @@ -71,10 +68,10 @@ func (g *policyGroup) revise(cpu, mem int64) []reviseResult { } func (g *policyGroup) resetForTable(entry *catalog.TableEntry) { + g.config = g.configProvider.getConfig(entry) for _, p := range g.policies { - p.resetForTable(entry) + p.resetForTable(entry, g.config) } - g.config = g.configProvider.getConfig(entry) } func (g *policyGroup) setConfig(tbl *catalog.TableEntry, txn txnif.AsyncTxn, cfg *BasicPolicyConfig) (err error) { @@ -166,7 +163,7 @@ func (g *policyGroup) setConfig(tbl *catalog.TableEntry, txn txnif.AsyncTxn, cfg } return tblHandle.AlterTable( ctx, - NewUpdatePolicyReq(cfg), + newUpdatePolicyReq(cfg), ) } @@ -182,159 +179,3 @@ func (g *policyGroup) getConfig(tbl *catalog.TableEntry) *BasicPolicyConfig { } return r } - -type basic struct { - schema *catalog.Schema - objects []*catalog.ObjectEntry - - lastMergeTime time.Time - - objectsSize int - accBuf []int -} - -func newBasicPolicy() policy { - return &basic{ - objects: make([]*catalog.ObjectEntry, 0, 16), - accBuf: make([]int, 1, 32), - } -} - -// impl policy for Basic -func (o *basic) onObject(obj *catalog.ObjectEntry, config *BasicPolicyConfig) bool { - if obj.IsTombstone { - return false - } - - osize := int(obj.OriginSize()) - - isCandidate := func() bool { - if len(o.objects) >= config.MergeMaxOneRun { - return false - } - if osize < int(config.ObjectMinOsize) { - if o.objectsSize > 2*common.DefaultMaxOsizeObjMB*common.Const1MBytes { - return false - } - o.objectsSize += osize - return true - } - // skip big object as an insurance - if osize > 110*common.Const1MBytes { - return false - } - - return false - } - - if isCandidate() { - o.objects = append(o.objects, obj) - return true - } - return false -} - -func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { - slices.SortFunc(o.objects, func(a, b *catalog.ObjectEntry) int { - return cmp.Compare(a.Rows(), b.Rows()) - }) - objs := o.objects - - isStandalone := common.IsStandaloneBoost.Load() - mergeOnDNIfStandalone := !common.ShouldStandaloneCNTakeOver.Load() - - dnobjs := controlMem(objs, mem) - dnobjs = o.optimize(dnobjs, config) - - dnosize, _ := estimateMergeConsume(dnobjs) - - schedDN := func() []reviseResult { - if cpu > 85 { - if dnosize > 25*common.Const1MBytes { - logutil.Infof("mergeblocks skip big merge for high level cpu usage, %d", cpu) - return nil - } - } - if len(dnobjs) > 1 { - return []reviseResult{{dnobjs, TaskHostDN}} - } - return nil - } - - schedCN := func() []reviseResult { - cnobjs := controlMem(objs, int64(common.RuntimeCNMergeMemControl.Load())) - cnobjs = o.optimize(cnobjs, config) - return []reviseResult{{cnobjs, TaskHostCN}} - } - - if isStandalone && mergeOnDNIfStandalone { - return schedDN() - } - - // CNs come into the picture in two cases: - // 1.cluster deployed - // 2.standalone deployed but it's asked to merge on cn - if common.RuntimeCNTakeOverAll.Load() || dnosize > int(common.RuntimeMinCNMergeSize.Load()) { - return schedCN() - } - - // CNs don't take over the task, leave it on dn. - return schedDN() -} - -func (o *basic) optimize(objs []*catalog.ObjectEntry, config *BasicPolicyConfig) []*catalog.ObjectEntry { - // objs are sorted by remaining rows - o.accBuf = o.accBuf[:1] - for i, obj := range objs { - o.accBuf = append(o.accBuf, o.accBuf[i]+int(obj.Rows())) - } - acc := o.accBuf - - isBigGap := func(small, big int) bool { - if big < int(o.schema.Extra.BlockMaxRows) { - return false - } - return big-small > 3*small - } - - var i int - // skip merging objects with big row count gaps, 3x and more - for i = len(acc) - 1; i > 1 && isBigGap(acc[i-1], acc[i]); i-- { - } - - readyToMergeRows := acc[i] - - // avoid frequent small object merge - if readyToMergeRows < int(o.schema.Extra.BlockMaxRows) && - !o.lastMergeTime.Before(time.Now().Add(-constSmallMergeGap)) && - i < config.MergeMaxOneRun { - return nil - } - - objs = objs[:i] - - return objs -} - -func controlMem(objs []*catalog.ObjectEntry, mem int64) []*catalog.ObjectEntry { - if mem > constMaxMemCap { - mem = constMaxMemCap - } - - needPopout := func(ss []*catalog.ObjectEntry) bool { - _, esize := estimateMergeConsume(ss) - return esize > int(2*mem/3) - } - for needPopout(objs) { - objs = objs[:len(objs)-1] - } - - return objs -} - -func (o *basic) resetForTable(entry *catalog.TableEntry) { - o.schema = entry.GetLastestSchemaLocked(false) - o.lastMergeTime = entry.Stats.GetLastMergeTime() - o.objects = o.objects[:0] - o.objectsSize = 0 -} diff --git a/pkg/vm/engine/tae/db/merge/policyOverlap.go b/pkg/vm/engine/tae/db/merge/policyOverlap.go index a12c5a895ba90..585c5a3adefeb 100644 --- a/pkg/vm/engine/tae/db/merge/policyOverlap.go +++ b/pkg/vm/engine/tae/db/merge/policyOverlap.go @@ -15,28 +15,28 @@ package merge import ( - "cmp" "slices" + "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/compute" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) var _ policy = (*objOverlapPolicy)(nil) +var levels = [6]int{ + 1, 2, 4, 16, 64, 256, +} + type objOverlapPolicy struct { - objects []*catalog.ObjectEntry - objectsSize int + leveledObjects [len(levels)][]*catalog.ObjectEntry - overlappingObjsSet [][]*catalog.ObjectEntry + segments map[objectio.Segmentid]map[*catalog.ObjectEntry]struct{} } func newObjOverlapPolicy() *objOverlapPolicy { return &objOverlapPolicy{ - objects: make([]*catalog.ObjectEntry, 0), - overlappingObjsSet: make([][]*catalog.ObjectEntry, 0), + segments: make(map[objectio.Segmentid]map[*catalog.ObjectEntry]struct{}), } } @@ -44,117 +44,126 @@ func (m *objOverlapPolicy) onObject(obj *catalog.ObjectEntry, config *BasicPolic if obj.IsTombstone { return false } - if obj.OriginSize() < config.ObjectMinOsize { - return false - } if !obj.SortKeyZoneMap().IsInited() { return false } - if m.objectsSize > 10*common.DefaultMaxOsizeObjMB*common.Const1MBytes { - return false + if m.segments[obj.ObjectName().SegmentId()] == nil { + m.segments[obj.ObjectName().SegmentId()] = make(map[*catalog.ObjectEntry]struct{}) } - m.objects = append(m.objects, obj) - m.objectsSize += int(obj.OriginSize()) + m.segments[obj.ObjectName().SegmentId()][obj] = struct{}{} return true } -func (m *objOverlapPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { - if len(m.objects) < 2 { - return nil - } - if cpu > 80 { - return nil - } - objs, taskHostKind := m.reviseDataObjs(config) - objs = controlMem(objs, mem) - if len(objs) > 1 { - return []reviseResult{{objs, taskHostKind}} - } - return nil -} - -func (m *objOverlapPolicy) reviseDataObjs(config *BasicPolicyConfig) ([]*catalog.ObjectEntry, TaskHostKind) { - slices.SortFunc(m.objects, func(a, b *catalog.ObjectEntry) int { - zmA := a.SortKeyZoneMap() - zmB := b.SortKeyZoneMap() - if c := zmA.CompareMin(zmB); c != 0 { - return c - } - return zmA.CompareMax(zmB) - }) - set := entrySet{entries: make([]*catalog.ObjectEntry, 0), maxValue: []byte{}} - for _, obj := range m.objects { - if len(set.entries) == 0 { - set.add(obj) - continue +func (m *objOverlapPolicy) revise(rc *resourceController) []reviseResult { + for _, objects := range m.segments { + l := segLevel(len(objects)) + for obj := range objects { + m.leveledObjects[l] = append(m.leveledObjects[l], obj) } + } - if zm := obj.SortKeyZoneMap(); index.StrictlyCompareZmMaxAndMin(set.maxValue, zm.GetMinBuf(), zm.GetType(), zm.GetScale(), zm.GetScale()) > 0 { - // zm is overlapped - set.add(obj) + reviseResults := make([]reviseResult, 0, len(levels)) + for i := range 4 { + if len(m.leveledObjects[i]) < 2 { continue } - // obj is not added in the set. - // either dismiss the set or add the set in m.overlappingObjsSet - if len(set.entries) > 1 { - objs := make([]*catalog.ObjectEntry, len(set.entries)) - copy(objs, set.entries) - m.overlappingObjsSet = append(m.overlappingObjsSet, objs) + for _, objs := range objectsWithGivenOverlaps(m.leveledObjects[i], 5) { + objs = removeOversize(objs) + if len(objs) < 2 || score(objs) < 1.1 { + continue + } + result := reviseResult{objs: objs, kind: taskHostDN} + if result.kind == taskHostDN { + if rc.cpuPercent > 80 { + continue + } + + if rc.resourceAvailable(result.objs) { + rc.reserveResources(result.objs) + } else { + result.kind = taskHostCN + } + } + reviseResults = append(reviseResults, result) } - - set.reset() - set.add(obj) } - // there is still more than one entry in set. - if len(set.entries) > 1 { - objs := make([]*catalog.ObjectEntry, len(set.entries)) - copy(objs, set.entries) - m.overlappingObjsSet = append(m.overlappingObjsSet, objs) - set.reset() - } - if len(m.overlappingObjsSet) == 0 { - return nil, TaskHostDN - } - - slices.SortFunc(m.overlappingObjsSet, func(a, b []*catalog.ObjectEntry) int { - return cmp.Compare(len(a), len(b)) - }) + return reviseResults +} - // get the overlapping set with most objs. - objs := m.overlappingObjsSet[len(m.overlappingObjsSet)-1] - if len(objs) < 2 { - return nil, TaskHostDN +func (m *objOverlapPolicy) resetForTable(*catalog.TableEntry, *BasicPolicyConfig) { + for i := range m.leveledObjects { + m.leveledObjects[i] = m.leveledObjects[i][:0] } - if len(objs) > config.MergeMaxOneRun { - objs = objs[:config.MergeMaxOneRun] - } - return objs, TaskHostDN + clear(m.segments) } -func (m *objOverlapPolicy) resetForTable(*catalog.TableEntry) { - m.objects = m.objects[:0] - m.overlappingObjsSet = m.overlappingObjsSet[:0] - m.objectsSize = 0 +func segLevel(length int) int { + l := len(levels) - 1 + for i, level := range levels { + if length < level { + l = i - 1 + break + } + } + return l } -type entrySet struct { - entries []*catalog.ObjectEntry - maxValue []byte - size int -} +type endPoint struct { + val []byte + s int -func (s *entrySet) reset() { - s.entries = s.entries[:0] - s.maxValue = []byte{} - s.size = 0 + obj *catalog.ObjectEntry } -func (s *entrySet) add(obj *catalog.ObjectEntry) { - s.entries = append(s.entries, obj) - s.size += int(obj.OriginSize()) - if zm := obj.SortKeyZoneMap(); len(s.maxValue) == 0 || - compute.Compare(s.maxValue, zm.GetMaxBuf(), zm.GetType(), zm.GetScale(), zm.GetScale()) < 0 { - s.maxValue = zm.GetMaxBuf() +func objectsWithGivenOverlaps(objects []*catalog.ObjectEntry, overlaps int) [][]*catalog.ObjectEntry { + if len(objects) < 2 { + return nil + } + points := make([]endPoint, 0, 2*len(objects)) + for _, obj := range objects { + zm := obj.SortKeyZoneMap() + points = append(points, endPoint{val: zm.GetMinBuf(), s: 1, obj: obj}) + points = append(points, endPoint{val: zm.GetMaxBuf(), s: -1, obj: obj}) + } + slices.SortFunc(points, func(a, b endPoint) int { + c := compute.Compare(a.val, b.val, objects[0].SortKeyZoneMap().GetType(), + a.obj.SortKeyZoneMap().GetScale(), b.obj.SortKeyZoneMap().GetScale()) + if c != 0 { + return c + } + // left node is first + return -a.s + }) + + globalMax := 0 + + res := make([][]*catalog.ObjectEntry, 0) + tmp := make(map[*catalog.ObjectEntry]struct{}) + for { + objs := make([]*catalog.ObjectEntry, 0, len(points)/2) + clear(tmp) + for _, p := range points { + if p.s == 1 { + tmp[p.obj] = struct{}{} + } else { + delete(tmp, p.obj) + } + if len(tmp) > globalMax { + globalMax = len(tmp) + objs = objs[:0] + for obj := range tmp { + objs = append(objs, obj) + } + } + } + if len(objs) < overlaps { + return res + } + res = append(res, objs) + points = slices.DeleteFunc(points, func(point endPoint) bool { + return slices.Contains(objs, point.obj) + }) + globalMax = 0 } } diff --git a/pkg/vm/engine/tae/db/merge/policyTombstone.go b/pkg/vm/engine/tae/db/merge/policyTombstone.go index f36f7a04b28ad..f5e89d01e7edd 100644 --- a/pkg/vm/engine/tae/db/merge/policyTombstone.go +++ b/pkg/vm/engine/tae/db/merge/policyTombstone.go @@ -15,9 +15,13 @@ package merge import ( + "slices" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" ) +var _ policy = (*tombstonePolicy)(nil) + type tombstonePolicy struct { tombstones []*catalog.ObjectEntry } @@ -33,19 +37,19 @@ func (t *tombstonePolicy) onObject(entry *catalog.ObjectEntry, config *BasicPoli return true } -func (t *tombstonePolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { +func (t *tombstonePolicy) revise(*resourceController) []reviseResult { if len(t.tombstones) < 2 { return nil } - return []reviseResult{{t.tombstones, TaskHostDN}} + return []reviseResult{{slices.Clone(t.tombstones), taskHostDN}} } -func (t *tombstonePolicy) resetForTable(*catalog.TableEntry) { +func (t *tombstonePolicy) resetForTable(*catalog.TableEntry, *BasicPolicyConfig) { t.tombstones = t.tombstones[:0] } func newTombstonePolicy() policy { return &tombstonePolicy{ - tombstones: make([]*catalog.ObjectEntry, 0), + tombstones: make([]*catalog.ObjectEntry, 0, 5), } } diff --git a/pkg/vm/engine/tae/db/merge/policy_test.go b/pkg/vm/engine/tae/db/merge/policy_test.go index 9759150a6c1bc..96a663e1f218f 100644 --- a/pkg/vm/engine/tae/db/merge/policy_test.go +++ b/pkg/vm/engine/tae/db/merge/policy_test.go @@ -17,20 +17,26 @@ package merge import ( "context" "math" + "math/rand/v2" "testing" - "github.com/stretchr/testify/require" - + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/testutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func testConfig(objectMinOSize uint32, maxOneRun int) *BasicPolicyConfig { @@ -44,7 +50,7 @@ func newSortedDataEntryWithTableEntry(t *testing.T, tbl *catalog.TableEntry, txn zm := index.NewZM(types.T_int32, 0) index.UpdateZM(zm, types.EncodeInt32(&v1)) index.UpdateZM(zm, types.EncodeInt32(&v2)) - stats := objectio.NewObjectStats() + stats := objectio.NewObjectStatsWithObjectID(objectio.NewObjectid(), false, true, false) require.NoError(t, objectio.SetObjectStatsSortKeyZoneMap(stats, zm)) require.NoError(t, objectio.SetObjectStatsOriginSize(stats, size)) require.NoError(t, objectio.SetObjectStatsRowCnt(stats, 2)) @@ -69,11 +75,13 @@ func newSortedTombstoneEntryWithTableEntry(t *testing.T, tbl *catalog.TableEntry return entry } -func newSortedTestObjectEntry(t *testing.T, v1, v2 int32, size uint32) *catalog.ObjectEntry { +func newSortedTestObjectEntry(t testing.TB, v1, v2 int32, size uint32) *catalog.ObjectEntry { zm := index.NewZM(types.T_int32, 0) index.UpdateZM(zm, types.EncodeInt32(&v1)) index.UpdateZM(zm, types.EncodeInt32(&v2)) stats := objectio.NewObjectStats() + objName := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) + require.NoError(t, objectio.SetObjectStatsObjectName(stats, objName)) require.NoError(t, objectio.SetObjectStatsSortKeyZoneMap(stats, zm)) require.NoError(t, objectio.SetObjectStatsOriginSize(stats, size)) require.NoError(t, objectio.SetObjectStatsRowCnt(stats, 2)) @@ -82,14 +90,18 @@ func newSortedTestObjectEntry(t *testing.T, v1, v2 int32, size uint32) *catalog. } } -func newTestObjectEntryWithRowCnt(t *testing.T, size, rowCnt uint32, isTombstone bool) *catalog.ObjectEntry { +func newTestVarcharObjectEntry(t testing.TB, v1, v2 string, size uint32) *catalog.ObjectEntry { + zm := index.NewZM(types.T_varchar, 0) + index.UpdateZM(zm, []byte(v1)) + index.UpdateZM(zm, []byte(v2)) stats := objectio.NewObjectStats() + objName := objectio.BuildObjectNameWithObjectID(objectio.NewObjectid()) + require.NoError(t, objectio.SetObjectStatsObjectName(stats, objName)) + require.NoError(t, objectio.SetObjectStatsSortKeyZoneMap(stats, zm)) require.NoError(t, objectio.SetObjectStatsOriginSize(stats, size)) - require.NoError(t, objectio.SetObjectStatsRowCnt(stats, rowCnt)) - + require.NoError(t, objectio.SetObjectStatsRowCnt(stats, 2)) return &catalog.ObjectEntry{ ObjectMVCCNode: catalog.ObjectMVCCNode{ObjectStats: *stats}, - ObjectNode: catalog.ObjectNode{IsTombstone: isTombstone}, } } @@ -103,101 +115,57 @@ func newTestObjectEntry(t *testing.T, size uint32, isTombstone bool) *catalog.Ob } } -func TestPolicyBasic(t *testing.T) { - common.IsStandaloneBoost.Store(true) - p := newBasicPolicy() - - // only schedule objects whose size < cfg.objectMinOSize - p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) - cfg := testConfig(100, 3) - require.True(t, p.onObject(newTestObjectEntry(t, 10, false), cfg)) - require.True(t, p.onObject(newTestObjectEntry(t, 20, false), cfg)) - require.False(t, p.onObject(newTestObjectEntry(t, 120, false), cfg)) - result := p.revise(0, math.MaxInt64, cfg) - require.Equal(t, 1, len(result)) - require.Equal(t, 2, len(result[0].objs)) - require.Equal(t, TaskHostDN, result[0].kind) - - // only schedule objects less than cfg.maxOneRun - p.resetForTable(catalog.MockStaloneTableEntry(1, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) - cfg = testConfig(100, 2) - require.True(t, p.onObject(newTestObjectEntry(t, 10, false), cfg)) - require.True(t, p.onObject(newTestObjectEntry(t, 20, false), cfg)) - require.False(t, p.onObject(newTestObjectEntry(t, 30, false), cfg)) - result = p.revise(0, math.MaxInt64, cfg) - require.Equal(t, 1, len(result)) - require.Equal(t, 2, len(result[0].objs)) - require.Equal(t, TaskHostDN, result[0].kind) - - // basic policy do not schedule tombstones - p.resetForTable(catalog.MockStaloneTableEntry(2, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) - cfg = testConfig(100, 2) - require.False(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) - require.False(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) - require.Equal(t, 0, len(result)) - - // memory limit - p.resetForTable(catalog.MockStaloneTableEntry(2, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) - cfg = testConfig(100, 3) - require.True(t, p.onObject(newTestObjectEntryWithRowCnt(t, 10, 1, false), cfg)) - require.True(t, p.onObject(newTestObjectEntryWithRowCnt(t, 20, 1, false), cfg)) - require.True(t, p.onObject(newTestObjectEntryWithRowCnt(t, 20, 1, false), cfg)) - result = p.revise(0, 36, cfg) - require.Equal(t, 1, len(result)) - require.Equal(t, 2, len(result[0].objs)) - require.Equal(t, TaskHostDN, result[0].kind) -} - func TestPolicyTombstone(t *testing.T) { common.IsStandaloneBoost.Store(true) p := newTombstonePolicy() + rc := new(resourceController) // tombstone policy do not schedule data objects - p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) + p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) cfg := testConfig(100, 2) require.False(t, p.onObject(newTestObjectEntry(t, 10, false), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 20, false), cfg)) - result := p.revise(0, math.MaxInt64, cfg) + result := p.revise(rc) require.Equal(t, 0, len(result)) - p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) + p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) cfg = testConfig(100, 2) require.True(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc) require.Equal(t, 1, len(result)) require.Equal(t, 2, len(result[0].objs)) - require.Equal(t, TaskHostDN, result[0].kind) + require.Equal(t, taskHostDN, result[0].kind) // only schedule objects less than cfg.maxOneRun - p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) + p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) cfg = testConfig(100, 2) require.True(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 30, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc) require.Equal(t, 1, len(result)) require.Equal(t, 2, len(result[0].objs)) - require.Equal(t, TaskHostDN, result[0].kind) + require.Equal(t, taskHostDN, result[0].kind) // tombstone do not consider size limit - p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) + p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) cfg = testConfig(100, 3) require.True(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 120, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc) require.Equal(t, 1, len(result)) require.Equal(t, 3, len(result[0].objs)) - require.Equal(t, TaskHostDN, result[0].kind) + require.Equal(t, taskHostDN, result[0].kind) } func TestPolicyGroup(t *testing.T) { common.IsStandaloneBoost.Store(true) - g := newPolicyGroup(newBasicPolicy(), newTombstonePolicy()) + g := newPolicyGroup(newTombstonePolicy()) g.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) g.config = &BasicPolicyConfig{MergeMaxOneRun: 2, ObjectMinOsize: 100} + rc := new(resourceController) g.onObject(newTestObjectEntry(t, 10, false)) g.onObject(newTestObjectEntry(t, 20, false)) @@ -206,13 +174,11 @@ func TestPolicyGroup(t *testing.T) { g.onObject(newTestObjectEntry(t, 20, true)) g.onObject(newTestObjectEntry(t, 30, true)) - results := g.revise(0, math.MaxInt64) - require.Equal(t, 2, len(results)) - require.Equal(t, TaskHostDN, results[0].kind) - require.Equal(t, TaskHostDN, results[1].kind) + results := g.revise(rc) + require.Equal(t, 1, len(results)) + require.Equal(t, taskHostDN, results[0].kind) require.Equal(t, 2, len(results[0].objs)) - require.Equal(t, 2, len(results[1].objs)) } const overlapSizeThreshold = common.DefaultMinOsizeQualifiedMB * common.Const1MBytes @@ -221,43 +187,48 @@ func TestObjOverlap(t *testing.T) { // empty policy policy := newObjOverlapPolicy() - objs := policy.revise(0, math.MaxInt64, defaultBasicConfig) - require.Equal(t, 0, len(objs)) + rc := new(resourceController) + rc.setMemLimit(estimateMemUsagePerRow * 20) + objs := policy.revise(rc) + for _, obj := range objs { + require.Equal(t, 0, len(obj.objs)) + } - policy.resetForTable(nil) + policy.resetForTable(nil, nil) // no overlap entry1 := newSortedTestObjectEntry(t, 1, 2, overlapSizeThreshold) entry2 := newSortedTestObjectEntry(t, 3, 4, overlapSizeThreshold) require.True(t, policy.onObject(entry1, defaultBasicConfig)) require.True(t, policy.onObject(entry2, defaultBasicConfig)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) - require.Equal(t, 0, len(objs)) + objs = policy.revise(rc) + for _, obj := range objs { + require.Equal(t, 0, len(obj.objs)) + } - policy.resetForTable(nil) + policy.resetForTable(nil, nil) // overlap entry3 := newSortedTestObjectEntry(t, 1, 4, overlapSizeThreshold) entry4 := newSortedTestObjectEntry(t, 2, 3, overlapSizeThreshold) require.True(t, policy.onObject(entry3, defaultBasicConfig)) require.True(t, policy.onObject(entry4, defaultBasicConfig)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) - require.Equal(t, 1, len(objs)) - require.Equal(t, 2, len(objs[0].objs)) - require.Equal(t, TaskHostDN, objs[0].kind) - - policy.resetForTable(nil) + objs = policy.revise(rc) + require.Zero(t, len(objs)) + policy.resetForTable(nil, nil) // entry is not sorted entry5 := newTestObjectEntry(t, overlapSizeThreshold, false) entry6 := newTestObjectEntry(t, overlapSizeThreshold, false) require.False(t, policy.onObject(entry5, defaultBasicConfig)) require.False(t, policy.onObject(entry6, defaultBasicConfig)) - require.Equal(t, 0, len(policy.objects)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) - require.Equal(t, 0, len(objs)) + require.Equal(t, 6, len(policy.leveledObjects)) + objs = policy.revise(rc) + for _, obj := range objs { + require.Equal(t, 0, len(obj.objs)) + } - policy.resetForTable(nil) + policy.resetForTable(nil, nil) // two overlap set: // {entry7, entry8} @@ -275,12 +246,10 @@ func TestObjOverlap(t *testing.T) { require.True(t, policy.onObject(entry10, defaultBasicConfig)) require.True(t, policy.onObject(entry11, defaultBasicConfig)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) - require.Equal(t, 1, len(objs)) - require.Equal(t, 3, len(objs[0].objs)) - require.Equal(t, TaskHostDN, objs[0].kind) + objs = policy.revise(rc) + require.Zero(t, len(objs)) - policy.resetForTable(nil) + policy.resetForTable(nil, nil) // no enough memory entry12 := newSortedTestObjectEntry(t, 1, 4, overlapSizeThreshold) @@ -289,16 +258,19 @@ func TestObjOverlap(t *testing.T) { require.True(t, policy.onObject(entry12, defaultBasicConfig)) require.True(t, policy.onObject(entry13, defaultBasicConfig)) - objs = policy.revise(0, 0, defaultBasicConfig) - require.Equal(t, 0, len(objs)) + objs = policy.revise(rc) + for _, obj := range objs { + require.Equal(t, 0, len(obj.objs)) + } - policy.resetForTable(nil) + policy.resetForTable(nil, nil) } func TestPolicyCompact(t *testing.T) { fs, err := fileservice.NewMemoryFS("memory", fileservice.DisabledCacheConfig, nil) require.NoError(t, err) p := newObjCompactPolicy(fs) + rc := new(resourceController) cata := catalog.MockCatalog() defer cata.Close() @@ -313,9 +285,15 @@ func TestPolicyCompact(t *testing.T) { require.NoError(t, err) require.NoError(t, txn1.Commit(context.Background())) - p.resetForTable(tbl) + obj := catalog.MockObjEntryWithTbl(tbl, math.MaxUint32, false) + tombstone := catalog.MockObjEntryWithTbl(tbl, math.MaxUint32, true) + require.NoError(t, objectio.SetObjectStatsOriginSize(tombstone.GetObjectStats(), math.MaxUint32)) + tbl.AddEntryLocked(obj) + tbl.AddEntryLocked(tombstone) + p.resetForTable(tbl, &BasicPolicyConfig{}) + p.onObject(obj, &BasicPolicyConfig{}) - objs := p.revise(0, math.MaxInt64, defaultBasicConfig) + objs := p.revise(rc) require.Equal(t, 0, len(objs)) txn2, _ := txnMgr.StartTxn(nil) @@ -347,7 +325,7 @@ func Test_timeout(t *testing.T) { require.NoError(t, err) require.NoError(t, txn1.Commit(context.Background())) - p.resetForTable(tbl) + p.resetForTable(tbl, defaultBasicConfig) txn3, _ := txnMgr.StartTxn(nil) ent3 := newSortedTombstoneEntryWithTableEntry(t, tbl, txn3, types.Rowid{0}, types.Rowid{1}) @@ -357,8 +335,120 @@ func Test_timeout(t *testing.T) { copy(originSizes, minSizeBytes) require.NoError(t, txn3.Commit(context.Background())) - p.tombstoneStats = []objectio.ObjectStats{ - ent3.ObjectStats, - } require.False(t, p.onObject(ent3, defaultBasicConfig)) } + +func TestSegLevel(t *testing.T) { + require.Equal(t, 0, segLevel(1)) + require.Equal(t, 1, segLevel(2)) + require.Equal(t, 1, segLevel(3)) + require.Equal(t, 2, segLevel(4)) + require.Equal(t, 2, segLevel(5)) + require.Equal(t, 2, segLevel(6)) + require.Equal(t, 2, segLevel(14)) + require.Equal(t, 2, segLevel(15)) + require.Equal(t, 3, segLevel(16)) + require.Equal(t, 3, segLevel(17)) + require.Equal(t, 3, segLevel(63)) + require.Equal(t, 4, segLevel(64)) + require.Equal(t, 4, segLevel(65)) + require.Equal(t, 4, segLevel(255)) + require.Equal(t, 5, segLevel(256)) + require.Equal(t, 5, segLevel(257)) +} + +func TestCheckTombstone(t *testing.T) { + mp := mpool.MustNewZero() + + fs := testutil.NewSharedFS() + + rowCnt := 100 + ssCnt := 2 + + rowids := make([]types.Rowid, rowCnt) + metas := make([]objectio.ObjectDataMeta, ssCnt) + for i := 0; i < ssCnt; i++ { + writer := blockio.ConstructTombstoneWriter(objectio.HiddenColumnSelection_None, fs) + assert.NotNil(t, writer) + + bat := batch.NewWithSize(2) + bat.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + bat.Vecs[1] = vector.NewVec(types.T_int32.ToType()) + + for j := 0; j < rowCnt; j++ { + row := types.RandomRowid() + rowids[j] = row + pk := rand.Int() + + err := vector.AppendFixed[types.Rowid](bat.Vecs[0], row, false, mp) + require.NoError(t, err) + + err = vector.AppendFixed[int32](bat.Vecs[1], int32(pk), false, mp) + require.NoError(t, err) + } + + _, err := writer.WriteBatch(bat) + require.NoError(t, err) + + _, _, err = writer.Sync(context.Background()) + require.NoError(t, err) + + ss := writer.GetObjectStats() + require.Equal(t, rowCnt, int(ss.Rows())) + meta, err := loadTombstoneMeta(&ss, fs) + require.NoError(t, err) + metas[i] = meta + } + for _, rowID := range rowids { + id := rowID.BorrowObjectID() + for i := range metas { + ok := checkTombstoneMeta(metas[i], id) + if i == 0 { + require.False(t, ok) + } else { + require.True(t, ok) + } + } + } +} + +func TestObjectsWithMaximumOverlaps(t *testing.T) { + o1 := newSortedTestObjectEntry(t, 0, 50, math.MaxInt32) + o2 := newSortedTestObjectEntry(t, 51, 100, math.MaxInt32) + o3 := newSortedTestObjectEntry(t, 49, 52, math.MaxInt32) + o4 := newSortedTestObjectEntry(t, 1, 52, math.MaxInt32) + o5 := newSortedTestObjectEntry(t, 50, 51, math.MaxInt32) + o6 := newSortedTestObjectEntry(t, 55, 60, math.MaxInt32) + + res1 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o2}, 2) + require.Equal(t, 0, len(res1)) + + res2 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o3}, 2) + require.Equal(t, 1, len(res2)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o3}, res2[0]) + + res3 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o2, o3}, 2) + require.Equal(t, 1, len(res3)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o2, o3}, res3[0]) + + res4 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o2, o3}, 2) + require.Equal(t, 1, len(res4)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o3}, res4[0]) + + res5 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o2, o3}, 2) + require.Equal(t, 1, len(res5)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o3}, res5[0]) + + res6 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o2, o3, o4}, 2) + require.Equal(t, 1, len(res6)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o3, o4}, res6[0]) + + res7 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o5}, 2) + require.Equal(t, 1, len(res7)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o5}, res7[0]) + + res8 := objectsWithGivenOverlaps([]*catalog.ObjectEntry{o1, o2, o3, o4, o5, o6}, 2) + require.Equal(t, 2, len(res8)) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o3, o4, o5}, res8[0]) + require.ElementsMatch(t, []*catalog.ObjectEntry{o2, o6}, res8[1]) +} diff --git a/pkg/vm/engine/tae/db/merge/scheduler.go b/pkg/vm/engine/tae/db/merge/scheduler.go index 6deccb16d0cce..1f7c685618251 100644 --- a/pkg/vm/engine/tae/db/merge/scheduler.go +++ b/pkg/vm/engine/tae/db/merge/scheduler.go @@ -17,6 +17,7 @@ package merge import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" @@ -24,6 +25,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" dto "github.com/prometheus/client_model/go" + "time" ) type Scheduler struct { @@ -34,20 +36,21 @@ type Scheduler struct { executor *executor skipForTransPageLimit bool + + rc *resourceController } -func NewScheduler(rt *dbutils.Runtime, sched CNMergeScheduler) *Scheduler { - policySlice := make([]policy, 0, 4) - policySlice = append(policySlice, newBasicPolicy(), newObjCompactPolicy(rt.Fs.Service)) - if !common.RuntimeDisableZMBasedMerge.Load() { - policySlice = append(policySlice, newObjOverlapPolicy()) +func NewScheduler(rt *dbutils.Runtime, sched *CNMergeScheduler) *Scheduler { + policySlice := []policy{ + newObjOverlapPolicy(), + newObjCompactPolicy(rt.Fs.Service), + newTombstonePolicy(), } - policySlice = append(policySlice, newTombstonePolicy()) - op := &Scheduler{ LoopProcessor: new(catalog.LoopProcessor), policies: newPolicyGroup(policySlice...), executor: newMergeExecutor(rt, sched), + rc: new(resourceController), } op.DatabaseFn = op.onDataBase @@ -76,22 +79,24 @@ func (s *Scheduler) resetForTable(entry *catalog.TableEntry) { } func (s *Scheduler) PreExecute() error { - s.executor.refreshMemInfo() + s.rc.refresh() s.skipForTransPageLimit = false m := &dto.Metric{} - v2.TaskMergeTransferPageLengthGauge.Write(m) + if err := v2.TaskMergeTransferPageLengthGauge.Write(m); err != nil { + return err + } pagesize := m.GetGauge().GetValue() * 28 /*int32 + rowid(24b)*/ - if pagesize > float64(s.executor.transferPageSizeLimit()) { + if pagesize > float64(s.rc.transferPageLimit) { logutil.Infof("[mergeblocks] skip merge scanning due to transfer page %s, limit %s", common.HumanReadableBytes(int(pagesize)), - common.HumanReadableBytes(int(s.executor.transferPageSizeLimit()))) + common.HumanReadableBytes(int(s.rc.transferPageLimit))) s.skipForTransPageLimit = true } return nil } func (s *Scheduler) PostExecute() error { - s.executor.printStats() + s.rc.printStats() return nil } @@ -99,7 +104,7 @@ func (s *Scheduler) onDataBase(*catalog.DBEntry) (err error) { if StopMerge.Load() { return moerr.GetOkStopCurrRecur() } - if s.executor.memAvailBytes() < 100*common.Const1MBytes { + if s.rc.availableMem() < 100*common.Const1MBytes { return moerr.GetOkStopCurrRecur() } @@ -143,7 +148,7 @@ func (s *Scheduler) onPostTable(tableEntry *catalog.TableEntry) (err error) { } // delObjs := s.ObjectHelper.finish() - results := s.policies.revise(s.executor.CPUPercent(), int64(s.executor.memAvailBytes())) + results := s.policies.revise(s.rc) for _, r := range results { if len(r.objs) > 0 { s.executor.executeFor(tableEntry, r.objs, r.kind) @@ -191,18 +196,14 @@ func (s *Scheduler) StartMerge(tblID uint64, reentrant bool) error { return s.executor.rt.LockMergeService.UnlockFromUser(tblID, reentrant) } -func objectValid(objectEntry *catalog.ObjectEntry) bool { - if objectEntry.IsAppendable() { - return false - } - if !objectEntry.IsActive() { - return false - } - if !objectEntry.IsCommitted() { - return false - } - if objectEntry.IsCreatingOrAborted() { - return false - } - return true +func (s *Scheduler) CNActiveObjectsString() string { + return s.executor.cnSched.activeObjsString() +} + +func (s *Scheduler) RemoveCNActiveObjects(ids []objectio.ObjectId) { + s.executor.cnSched.removeActiveObject(ids) +} + +func (s *Scheduler) PruneCNActiveObjects(id uint64, ago time.Duration) { + s.executor.cnSched.prune(id, ago) } diff --git a/pkg/vm/engine/tae/db/merge/scheduler_test.go b/pkg/vm/engine/tae/db/merge/scheduler_test.go index 1d50194496c7b..a360b8d93af70 100644 --- a/pkg/vm/engine/tae/db/merge/scheduler_test.go +++ b/pkg/vm/engine/tae/db/merge/scheduler_test.go @@ -16,9 +16,11 @@ package merge import ( "context" + "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/taskservice" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" @@ -28,10 +30,15 @@ import ( ) func TestStopStartMerge(t *testing.T) { + memStorage := taskservice.NewMemTaskStorage() + cnScheduler := NewTaskServiceGetter(func() (taskservice.TaskService, bool) { + return taskservice.NewTaskService(runtime.DefaultRuntime(), memStorage), true + }) + scheduler := Scheduler{ executor: newMergeExecutor(&dbutils.Runtime{ LockMergeService: dbutils.NewLockMergeService(), - }, nil), + }, cnScheduler), } lockService := scheduler.executor.rt.LockMergeService @@ -76,6 +83,8 @@ func TestStopStartMerge(t *testing.T) { require.Error(t, scheduler.onTable(tblEntry1)) require.Error(t, scheduler.onTable(tblEntry2)) + require.Empty(t, scheduler.CNActiveObjectsString()) + scheduler.StartMerge(tblEntry1.GetID(), false) require.Equal(t, 1, len(lockService.LockedInfos())) scheduler.StartMerge(tblEntry2.GetID(), false) diff --git a/pkg/vm/engine/tae/db/merge/utils.go b/pkg/vm/engine/tae/db/merge/utils.go index a88a383a556d4..8327646f93978 100644 --- a/pkg/vm/engine/tae/db/merge/utils.go +++ b/pkg/vm/engine/tae/db/merge/utils.go @@ -15,27 +15,329 @@ package merge import ( + "cmp" + "context" + "math" + "os" + "slices" + "sync/atomic" "time" + "github.com/KimMachineGun/automemlimit/memlimit" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/process" ) -func estimateMergeConsume(mobjs []*catalog.ObjectEntry) (origSize, estSize int) { - if len(mobjs) == 0 { +var StopMerge atomic.Bool + +type taskHostKind int + +const ( + taskHostCN taskHostKind = iota + taskHostDN + + constMaxMemCap = 12 * common.Const1GBytes // max original memory for an object + estimateMemUsagePerRow = 30 +) + +func score(objs []*catalog.ObjectEntry) float64 { + if len(objs) < 2 { + return 0 + } + totalDiff := float64(0) + minMaxZM := objs[0].SortKeyZoneMap().Clone() + if !minMaxZM.GetType().IsFixedLen() { + return math.MaxFloat64 + } + for _, obj := range objs { + zm := obj.SortKeyZoneMap() + index.UpdateZM(minMaxZM, zm.GetMinBuf()) + index.UpdateZM(minMaxZM, zm.GetMaxBuf()) + w := diff(zm.GetMax(), zm.GetMin(), zm.GetType()) + if w == math.MaxUint64 { + return math.MaxFloat64 + } + totalDiff += float64(w) + } + maxDiff := diff(minMaxZM.GetMax(), minMaxZM.GetMin(), minMaxZM.GetType()) + if maxDiff == math.MaxUint64 { + return math.MaxFloat64 + } + return totalDiff / float64(maxDiff) +} + +func diff(a, b any, t types.T) uint64 { + switch t { + case types.T_bool: + if a == b { + return 0 + } + return 1 + case types.T_bit: + x, y := a.(uint64), b.(uint64) + return max(x, y) - min(x, y) + case types.T_int8: + x, y := a.(int8), b.(int8) + return uint64(max(x, y) - min(x, y)) + case types.T_int16: + x, y := a.(int16), b.(int16) + return uint64(max(x, y) - min(x, y)) + case types.T_int32: + x, y := a.(int32), b.(int32) + return uint64(max(x, y) - min(x, y)) + case types.T_int64: + x, y := a.(int64), b.(int64) + return uint64(max(x, y) - min(x, y)) + case types.T_uint8: + x, y := a.(uint8), b.(uint8) + return uint64(max(x, y) - min(x, y)) + case types.T_uint16: + x, y := a.(uint16), b.(uint16) + return uint64(max(x, y) - min(x, y)) + case types.T_uint32: + x, y := a.(uint32), b.(uint32) + return uint64(max(x, y) - min(x, y)) + case types.T_uint64: + x, y := a.(uint64), b.(uint64) + return max(x, y) - min(x, y) + case types.T_float32: + x, y := a.(float32), b.(float32) + return uint64(max(x, y) - min(x, y)) + case types.T_float64: + x, y := a.(float64), b.(float64) + return uint64(max(x, y) - min(x, y)) + case types.T_date: + x, y := a.(types.Date), b.(types.Date) + return uint64(max(x, y) - min(x, y)) + case types.T_time: + x, y := a.(types.Time), b.(types.Time) + return uint64(max(x, y) - min(x, y)) + case types.T_datetime: + x, y := a.(types.Datetime), b.(types.Datetime) + return uint64(max(x, y) - min(x, y)) + case types.T_timestamp: + x, y := a.(types.Timestamp), b.(types.Timestamp) + return uint64(max(x, y) - min(x, y)) + case types.T_enum: + x, y := a.(types.Enum), b.(types.Enum) + return uint64(max(x, y) - min(x, y)) + case types.T_decimal64: + x, y := a.(types.Decimal64), b.(types.Decimal64) + return uint64(max(x, y) - min(x, y)) + default: + } + return math.MaxUint64 +} + +func removeOversize(objs []*catalog.ObjectEntry) []*catalog.ObjectEntry { + if len(objs) < 2 { + return objs + } + slices.SortFunc(objs, func(a, b *catalog.ObjectEntry) int { + return cmp.Compare(a.OriginSize(), b.OriginSize()) + }) + + if len(objs) == 2 { + if uint(objs[1].OriginSize()) < 3*uint(objs[0].OriginSize()) { + return objs[:2] + } + return nil + } + + accSize := int(objs[0].OriginSize()) + int(objs[1].OriginSize()) + i := 2 + for i < len(objs) { + size := int(objs[i].OriginSize()) + if size > accSize { + break + } + accSize += size + i++ + } + for j := i; j < len(objs); j++ { + objs[j] = nil + } + if i == 2 { + if uint(objs[1].OriginSize()) < 3*uint(objs[0].OriginSize()) { + return objs[:2] + } + return nil + } + return objs[:i] +} + +func estimateMergeSize(objs []*catalog.ObjectEntry) int { + size := 0 + for _, o := range objs { + size += int(o.Rows()) * estimateMemUsagePerRow + } + return size +} + +type resourceController struct { + proc *process.Process + + limit int64 + using int64 + reserved int64 + + reservedMergeRows int64 + transferPageLimit int64 + + cpuPercent float64 +} + +func (c *resourceController) setMemLimit(total uint64) { + cgroup, err := memlimit.FromCgroup() + if cgroup != 0 && cgroup < total { + c.limit = int64(cgroup / 4 * 3) + } else if total != 0 { + c.limit = int64(total / 4 * 3) + } else { + panic("failed to get system total memory") + } + + if c.limit > 200*common.Const1GBytes { + c.transferPageLimit = c.limit / 25 * 2 // 8% + } else if c.limit > 100*common.Const1GBytes { + c.transferPageLimit = c.limit / 25 * 3 // 12% + } else if c.limit > 40*common.Const1GBytes { + c.transferPageLimit = c.limit / 25 * 4 // 16% + } else { + c.transferPageLimit = math.MaxInt64 // no limit + } + + logutil.Info( + "MergeExecutorMemoryInfo", + common.AnyField("container-limit", common.HumanReadableBytes(int(cgroup))), + common.AnyField("host-memory", common.HumanReadableBytes(int(total))), + common.AnyField("merge-limit", common.HumanReadableBytes(int(c.limit))), + common.AnyField("transfer-page-limit", common.HumanReadableBytes(int(c.transferPageLimit))), + common.AnyField("error", err), + ) +} + +func (c *resourceController) refresh() { + if c.limit == 0 { + c.setMemLimit(totalMem()) + } + + if c.proc == nil { + c.proc, _ = process.NewProcess(int32(os.Getpid())) + } + if m, err := c.proc.MemoryInfo(); err == nil { + c.using = int64(m.RSS) + } + + if percents, err := cpu.Percent(0, false); err == nil { + c.cpuPercent = percents[0] + } + c.reservedMergeRows = 0 + c.reserved = 0 +} + +func (c *resourceController) availableMem() int64 { + avail := c.limit - c.using - c.reserved + if avail < 0 { + avail = 0 + } + return avail +} + +func (c *resourceController) printStats() { + if c.reservedMergeRows == 0 && c.availableMem() > 512*common.Const1MBytes { return } - rows := 0 - for _, m := range mobjs { - rows += int(m.Rows()) - origSize += int(m.OriginSize()) + + logutil.Info("MergeExecutorMemoryStats", + common.AnyField("merge-limit", common.HumanReadableBytes(int(c.limit))), + common.AnyField("process-mem", common.HumanReadableBytes(int(c.using))), + common.AnyField("reserving-rows", common.HumanReadableBytes(int(c.reservedMergeRows))), + common.AnyField("reserving-mem", common.HumanReadableBytes(int(c.reserved))), + ) +} + +func (c *resourceController) reserveResources(objs []*catalog.ObjectEntry) { + for _, obj := range objs { + c.reservedMergeRows += int64(obj.Rows()) + c.reserved += estimateMemUsagePerRow * int64(obj.Rows()) } - // the main memory consumption is transfer table. - // each row uses 12B, so estimate size is 12 * rows. - estSize = rows * 12 - return } -func entryOutdated(entry *catalog.ObjectEntry, lifetime time.Duration) bool { - createdAt := entry.CreatedAt.Physical() - return time.Unix(0, createdAt).Add(lifetime).Before(time.Now()) +func (c *resourceController) resourceAvailable(objs []*catalog.ObjectEntry) bool { + if c.reservedMergeRows*36 /*28 * 1.3 */ > c.transferPageLimit/8 { + return false + } + + mem := c.availableMem() + if mem > constMaxMemCap { + mem = constMaxMemCap + } + return estimateMergeSize(objs) <= int(2*mem/3) +} + +func objectValid(objectEntry *catalog.ObjectEntry) bool { + if objectEntry.IsAppendable() { + return false + } + if !objectEntry.IsActive() { + return false + } + if !objectEntry.IsCommitted() { + return false + } + if objectEntry.IsCreatingOrAborted() { + return false + } + return true +} + +func CleanUpUselessFiles(entry *api.MergeCommitEntry, fs fileservice.FileService) { + if entry == nil { + return + } + ctx, cancel := context.WithTimeoutCause(context.Background(), 2*time.Minute, moerr.CauseCleanUpUselessFiles) + defer cancel() + for _, filepath := range entry.BookingLoc { + _ = fs.Delete(ctx, filepath) + } + if len(entry.CreatedObjs) != 0 { + for _, obj := range entry.CreatedObjs { + if len(obj) == 0 { + continue + } + s := objectio.ObjectStats(obj) + _ = fs.Delete(ctx, s.ObjectName().String()) + } + } +} + +type policy interface { + onObject(*catalog.ObjectEntry, *BasicPolicyConfig) bool + revise(*resourceController) []reviseResult + resetForTable(*catalog.TableEntry, *BasicPolicyConfig) +} + +func newUpdatePolicyReq(c *BasicPolicyConfig) *api.AlterTableReq { + return &api.AlterTableReq{ + Kind: api.AlterKind_UpdatePolicy, + Operation: &api.AlterTableReq_UpdatePolicy{ + UpdatePolicy: &api.AlterTablePolicy{ + MinOsizeQuailifed: c.ObjectMinOsize, + MaxObjOnerun: uint32(c.MergeMaxOneRun), + MaxOsizeMergedObj: c.MaxOsizeMergedObj, + MinCnMergeSize: c.MinCNMergeSize, + Hints: c.MergeHints, + }, + }, + } } diff --git a/pkg/vm/engine/tae/db/merge/utils_test.go b/pkg/vm/engine/tae/db/merge/utils_test.go new file mode 100644 index 0000000000000..1745a76f7dc88 --- /dev/null +++ b/pkg/vm/engine/tae/db/merge/utils_test.go @@ -0,0 +1,141 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "math" + "os" + "path" + "testing" + + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestScore(t *testing.T) { + o1 := newSortedTestObjectEntry(t, 0, 3, 1) + o2 := newSortedTestObjectEntry(t, 1, 4, 2) + o3 := newSortedTestObjectEntry(t, 1, 5, 4) + o4 := newSortedTestObjectEntry(t, 1, 100, 4) + o5 := newSortedTestObjectEntry(t, 5, 10, math.MaxInt32) + + // should merge + require.Less(t, 1.1, score([]*catalog.ObjectEntry{o1, o1})) + require.Less(t, 1.1, score([]*catalog.ObjectEntry{o1, o2})) + require.Less(t, 1.1, score([]*catalog.ObjectEntry{o1, o3})) + // should not merge + require.Greater(t, 1.1, score([]*catalog.ObjectEntry{o1, o4})) + require.Greater(t, 1.1, score([]*catalog.ObjectEntry{o1, o2, o4})) + require.Greater(t, 1.1, score([]*catalog.ObjectEntry{o1, o5})) + + o6 := newTestVarcharObjectEntry(t, "a", "z", 1) + o7 := newTestVarcharObjectEntry(t, "b", "y", 1) + + require.Less(t, 1.1, score([]*catalog.ObjectEntry{o6, o7})) +} + +func TestRemoveOversize(t *testing.T) { + o1 := newSortedTestObjectEntry(t, 0, 0, 1) + o2 := newSortedTestObjectEntry(t, 0, 0, 2) + o3 := newSortedTestObjectEntry(t, 0, 0, 4) + o5 := newSortedTestObjectEntry(t, 0, 0, math.MaxInt32) + + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o2}, removeOversize([]*catalog.ObjectEntry{o1, o2})) + require.ElementsMatch(t, []*catalog.ObjectEntry{o1, o2}, removeOversize([]*catalog.ObjectEntry{o5, o1, o2})) + require.ElementsMatch(t, nil, removeOversize([]*catalog.ObjectEntry{o1, o3})) +} + +func TestDiff(t *testing.T) { + require.Equal(t, uint64(0), diff(false, false, types.T_bool)) + require.Equal(t, uint64(1), diff(false, true, types.T_bool)) + require.Equal(t, uint64(10), diff(int8(1), int8(11), types.T_int8)) + require.Equal(t, uint64(10), diff(uint8(1), uint8(11), types.T_uint8)) + require.Equal(t, uint64(10), diff(int16(1), int16(11), types.T_int16)) + require.Equal(t, uint64(10), diff(uint16(1), uint16(11), types.T_uint16)) + require.Equal(t, uint64(10), diff(int32(1), int32(11), types.T_int32)) + require.Equal(t, uint64(10), diff(uint32(1), uint32(11), types.T_uint32)) + require.Equal(t, uint64(10), diff(int64(1), int64(11), types.T_int64)) + require.Equal(t, uint64(10), diff(uint64(1), uint64(11), types.T_uint64)) + require.Equal(t, uint64(10), diff(int8(1), int8(11), types.T_int8)) + require.Equal(t, uint64(10), diff(int8(1), int8(11), types.T_int8)) + require.Equal(t, uint64(10), diff(float32(1), float32(11), types.T_float32)) + require.Equal(t, uint64(10), diff(float64(1), float64(11), types.T_float64)) + require.Equal(t, uint64(10), diff(types.Date(1), types.Date(11), types.T_date)) + require.Equal(t, uint64(10), diff(types.Datetime(1), types.Datetime(11), types.T_datetime)) + require.Equal(t, uint64(10), diff(types.Time(1), types.Time(11), types.T_time)) + require.Equal(t, uint64(10), diff(types.Timestamp(1), types.Timestamp(11), types.T_timestamp)) + require.Equal(t, uint64(10), diff(types.Enum(1), types.Enum(11), types.T_enum)) + require.Equal(t, uint64(10), diff(types.Decimal64(1), types.Decimal64(11), types.T_decimal64)) + require.Equal(t, uint64(math.MaxUint64), diff(types.Decimal128{}, types.Decimal128{}, types.T_decimal128)) +} + +func BenchmarkRemoveOversize(b *testing.B) { + o1 := newSortedTestObjectEntry(b, 0, 50, math.MaxInt32) + o2 := newSortedTestObjectEntry(b, 51, 100, 1) + o3 := newSortedTestObjectEntry(b, 49, 52, 2) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + removeOversize([]*catalog.ObjectEntry{o1, o2, o3}) + } +} + +func TestResourceController(t *testing.T) { + rc := new(resourceController) + rc.setMemLimit(10000) + require.Equal(t, int64(7500), rc.limit) + require.Equal(t, int64(7500), rc.availableMem()) + + rc.refresh() + rc.limit = rc.using + 1 + require.Equal(t, int64(1), rc.availableMem()) + + require.Panics(t, func() { rc.setMemLimit(0) }) +} + +func Test_CleanUpUselessFiles(t *testing.T) { + tDir := os.TempDir() + dir := path.Join(tDir, "/local") + assert.NoError(t, os.RemoveAll(dir)) + defer func() { + _ = os.RemoveAll(dir) + }() + + c := fileservice.Config{ + Name: defines.ETLFileServiceName, + Backend: "DISK", + DataDir: dir, + Cache: fileservice.DisabledCacheConfig, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fs, err := fileservice.NewFileService(ctx, c, nil) + assert.Nil(t, err) + defer fs.Close(ctx) + + ent := &api.MergeCommitEntry{ + BookingLoc: []string{"abc"}, + } + + CleanUpUselessFiles(ent, fs) +} diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index 6d4ca5d11f873..3ad4eb740f7bb 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -114,11 +114,10 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e } db = &DB{ - Dir: dirname, - Opts: opts, - Closed: new(atomic.Value), - usageMemo: logtail.NewTNUsageMemo(nil), - CNMergeSched: merge.NewTaskServiceGetter(opts.TaskServiceGetter), + Dir: dirname, + Opts: opts, + Closed: new(atomic.Value), + usageMemo: logtail.NewTNUsageMemo(nil), } fs := objectio.NewObjectFS(opts.Fs, serviceDir) localFs := objectio.NewObjectFS(opts.LocalFs, serviceDir) @@ -242,7 +241,7 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e // Init timed scanner scanner := NewDBScanner(db, nil) - db.MergeScheduler = merge.NewScheduler(db.Runtime, db.CNMergeSched) + db.MergeScheduler = merge.NewScheduler(db.Runtime, merge.NewTaskServiceGetter(opts.TaskServiceGetter)) scanner.RegisterOp(db.MergeScheduler) db.Wal.Start() db.BGCheckpointRunner.Start() diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index f3fd527f29d42..e688f8aca3619 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10025,7 +10025,7 @@ func TestStartStopTableMerge(t *testing.T) { db := testutil.InitTestDB(context.Background(), "MergeTest", t, nil) defer db.Close() - scheduler := merge.NewScheduler(db.Runtime, db.CNMergeSched) + scheduler := merge.NewScheduler(db.Runtime, nil) schema := catalog.MockSchema(2, 0) schema.Extra.BlockMaxRows = 1000 diff --git a/pkg/vm/engine/tae/index/zm.go b/pkg/vm/engine/tae/index/zm.go index 12f89cb61384b..fd56a8bac0645 100644 --- a/pkg/vm/engine/tae/index/zm.go +++ b/pkg/vm/engine/tae/index/zm.go @@ -1243,7 +1243,7 @@ func ZMPlus(v1, v2, res ZM) ZM { } // max = v1.max-v2.min -// min = v1.max-v2.min +// min = v1.min-v2.max func ZMMinus(v1, v2, res ZM) ZM { res.Reset() if !v1.compareCheck(v2) { diff --git a/pkg/vm/engine/tae/mergesort/merger.go b/pkg/vm/engine/tae/mergesort/merger.go index e0a2a07153f21..e2825b77ab126 100644 --- a/pkg/vm/engine/tae/mergesort/merger.go +++ b/pkg/vm/engine/tae/mergesort/merger.go @@ -100,12 +100,11 @@ type merger[T comparable] struct { sortKeyIdx int - isTombstone bool - rowPerBlk uint32 - stats mergeStats + rowPerBlk uint32 + stats mergeStats } -func newMerger[T comparable](host MergeTaskHost, lessFunc sort.LessFunc[T], sortKeyPos int, isTombstone bool, df dataFetcher[T]) Merger { +func newMerger[T comparable](host MergeTaskHost, lessFunc sort.LessFunc[T], sortKeyPos int, df dataFetcher[T]) Merger { size := host.GetObjectCnt() rowSizeU64 := host.GetTotalSize() / uint64(host.GetTotalRowCnt()) m := &merger[T]{ @@ -130,7 +129,6 @@ func newMerger[T comparable](host MergeTaskHost, lessFunc sort.LessFunc[T], sort blkPerObj: host.GetObjectMaxBlocks(), }, loadedObjBlkCnts: make([]int, size), - isTombstone: isTombstone, } totalBlkCnt := 0 for _, cnt := range m.objBlkCnts { @@ -215,14 +213,8 @@ func (m *merger[T]) merge(ctx context.Context) error { if m.writer == nil { m.writer = m.host.PrepareNewWriter() } - if m.isTombstone { - if _, err := m.writer.WriteBatch(m.buffer); err != nil { - return err - } - } else { - if _, err := m.writer.WriteBatch(m.buffer); err != nil { - return err - } + if _, err := m.writer.WriteBatch(m.buffer); err != nil { + return err } // force clean m.buffer.CleanOnlyData() @@ -252,15 +244,8 @@ func (m *merger[T]) merge(ctx context.Context) error { if m.writer == nil { m.writer = m.host.PrepareNewWriter() } - if m.isTombstone { - if _, err := m.writer.WriteBatch(m.buffer); err != nil { - return err - } - } else { - - if _, err := m.writer.WriteBatch(m.buffer); err != nil { - return err - } + if _, err := m.writer.WriteBatch(m.buffer); err != nil { + return err } m.buffer.CleanOnlyData() } @@ -345,7 +330,7 @@ func (m *merger[T]) release() { } } -func mergeObjs(ctx context.Context, mergeHost MergeTaskHost, sortKeyPos int, isTombstone bool) error { +func mergeObjs(ctx context.Context, mergeHost MergeTaskHost, sortKeyPos int) error { var merger Merger typ := mergeHost.GetSortKeyType() size := mergeHost.GetObjectCnt() @@ -356,7 +341,7 @@ func mergeObjs(ctx context.Context, mergeHost MergeTaskHost, sortKeyPos int, isT area []byte }, size), } - merger = newMerger(mergeHost, sort.GenericLess[string], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[string], sortKeyPos, df) } else { switch typ.Oid { case types.T_bool: @@ -364,139 +349,139 @@ func mergeObjs(ctx context.Context, mergeHost MergeTaskHost, sortKeyPos int, isT mustColFunc: vector.MustFixedColNoTypeCheck[bool], cols: make([][]bool, size), } - merger = newMerger(mergeHost, sort.BoolLess, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.BoolLess, sortKeyPos, df) case types.T_bit: df := &fixedDataFetcher[uint64]{ mustColFunc: vector.MustFixedColNoTypeCheck[uint64], cols: make([][]uint64, size), } - merger = newMerger(mergeHost, sort.GenericLess[uint64], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[uint64], sortKeyPos, df) case types.T_int8: df := &fixedDataFetcher[int8]{ mustColFunc: vector.MustFixedColNoTypeCheck[int8], cols: make([][]int8, size), } - merger = newMerger(mergeHost, sort.GenericLess[int8], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[int8], sortKeyPos, df) case types.T_int16: df := &fixedDataFetcher[int16]{ mustColFunc: vector.MustFixedColNoTypeCheck[int16], cols: make([][]int16, size), } - merger = newMerger(mergeHost, sort.GenericLess[int16], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[int16], sortKeyPos, df) case types.T_int32: df := &fixedDataFetcher[int32]{ mustColFunc: vector.MustFixedColNoTypeCheck[int32], cols: make([][]int32, size), } - merger = newMerger(mergeHost, sort.GenericLess[int32], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[int32], sortKeyPos, df) case types.T_int64: df := &fixedDataFetcher[int64]{ mustColFunc: vector.MustFixedColNoTypeCheck[int64], cols: make([][]int64, size), } - merger = newMerger(mergeHost, sort.GenericLess[int64], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[int64], sortKeyPos, df) case types.T_float32: df := &fixedDataFetcher[float32]{ mustColFunc: vector.MustFixedColNoTypeCheck[float32], cols: make([][]float32, size), } - merger = newMerger(mergeHost, sort.GenericLess[float32], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[float32], sortKeyPos, df) case types.T_float64: df := &fixedDataFetcher[float64]{ mustColFunc: vector.MustFixedColNoTypeCheck[float64], cols: make([][]float64, size), } - merger = newMerger(mergeHost, sort.GenericLess[float64], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[float64], sortKeyPos, df) case types.T_uint8: df := &fixedDataFetcher[uint8]{ mustColFunc: vector.MustFixedColNoTypeCheck[uint8], cols: make([][]uint8, size), } - merger = newMerger(mergeHost, sort.GenericLess[uint8], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[uint8], sortKeyPos, df) case types.T_uint16: df := &fixedDataFetcher[uint16]{ mustColFunc: vector.MustFixedColNoTypeCheck[uint16], cols: make([][]uint16, size), } - merger = newMerger(mergeHost, sort.GenericLess[uint16], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[uint16], sortKeyPos, df) case types.T_uint32: df := &fixedDataFetcher[uint32]{ mustColFunc: vector.MustFixedColNoTypeCheck[uint32], cols: make([][]uint32, size), } - merger = newMerger(mergeHost, sort.GenericLess[uint32], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[uint32], sortKeyPos, df) case types.T_uint64: df := &fixedDataFetcher[uint64]{ mustColFunc: vector.MustFixedColNoTypeCheck[uint64], cols: make([][]uint64, size), } - merger = newMerger(mergeHost, sort.GenericLess[uint64], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[uint64], sortKeyPos, df) case types.T_date: df := &fixedDataFetcher[types.Date]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Date], cols: make([][]types.Date, size), } - merger = newMerger(mergeHost, sort.GenericLess[types.Date], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[types.Date], sortKeyPos, df) case types.T_timestamp: df := &fixedDataFetcher[types.Timestamp]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Timestamp], cols: make([][]types.Timestamp, size), } - merger = newMerger(mergeHost, sort.GenericLess[types.Timestamp], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[types.Timestamp], sortKeyPos, df) case types.T_datetime: df := &fixedDataFetcher[types.Datetime]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Datetime], cols: make([][]types.Datetime, size), } - merger = newMerger(mergeHost, sort.GenericLess[types.Datetime], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[types.Datetime], sortKeyPos, df) case types.T_time: df := &fixedDataFetcher[types.Time]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Time], cols: make([][]types.Time, size), } - merger = newMerger(mergeHost, sort.GenericLess[types.Time], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[types.Time], sortKeyPos, df) case types.T_enum: df := &fixedDataFetcher[types.Enum]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Enum], cols: make([][]types.Enum, size), } - merger = newMerger(mergeHost, sort.GenericLess[types.Enum], sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.GenericLess[types.Enum], sortKeyPos, df) case types.T_decimal64: df := &fixedDataFetcher[types.Decimal64]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Decimal64], cols: make([][]types.Decimal64, size), } - merger = newMerger(mergeHost, sort.Decimal64Less, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.Decimal64Less, sortKeyPos, df) case types.T_decimal128: df := &fixedDataFetcher[types.Decimal128]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Decimal128], cols: make([][]types.Decimal128, size), } - merger = newMerger(mergeHost, sort.Decimal128Less, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.Decimal128Less, sortKeyPos, df) case types.T_uuid: df := &fixedDataFetcher[types.Uuid]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Uuid], cols: make([][]types.Uuid, size), } - merger = newMerger(mergeHost, sort.UuidLess, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.UuidLess, sortKeyPos, df) case types.T_TS: df := &fixedDataFetcher[types.TS]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.TS], cols: make([][]types.TS, size), } - merger = newMerger(mergeHost, sort.TsLess, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.TsLess, sortKeyPos, df) case types.T_Rowid: df := &fixedDataFetcher[types.Rowid]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Rowid], cols: make([][]types.Rowid, size), } - merger = newMerger(mergeHost, sort.RowidLess, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.RowidLess, sortKeyPos, df) case types.T_Blockid: df := &fixedDataFetcher[types.Blockid]{ mustColFunc: vector.MustFixedColNoTypeCheck[types.Blockid], cols: make([][]types.Blockid, size), } - merger = newMerger(mergeHost, sort.BlockidLess, sortKeyPos, isTombstone, df) + merger = newMerger(mergeHost, sort.BlockidLess, sortKeyPos, df) default: return moerr.NewErrUnsupportedDataType(ctx, typ) } diff --git a/pkg/vm/engine/tae/mergesort/task.go b/pkg/vm/engine/tae/mergesort/task.go index 1c3305f83352d..3b0d0c44bc453 100644 --- a/pkg/vm/engine/tae/mergesort/task.go +++ b/pkg/vm/engine/tae/mergesort/task.go @@ -17,6 +17,7 @@ package mergesort import ( "context" "fmt" + "strings" "time" "github.com/docker/go-units" @@ -29,6 +30,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" + v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "go.uber.org/zap" @@ -88,30 +90,16 @@ func DoMergeAndWrite( txnInfo string, sortkeyPos int, mergehost MergeTaskHost, - isTombstone bool, ) (err error) { - now := time.Now() + start := time.Now() /*out args, keep the transfer information*/ commitEntry := mergehost.GetCommitEntry() - fromObjsDesc := "" - fromSize := uint32(0) - for _, o := range commitEntry.MergedObjs { - obj := objectio.ObjectStats(o) - fromObjsDesc += fmt.Sprintf("%s(%v, %s)Rows(%v),", - obj.ObjectName().ObjectId().ShortStringEx(), - obj.BlkCnt(), - units.BytesSize(float64(obj.OriginSize())), - obj.Rows()) - fromSize += obj.OriginSize() - } - logutil.Info( - "[MERGE-START]", - zap.String("task", mergehost.Name()), - common.AnyField("txn-info", txnInfo), - common.AnyField("host", mergehost.HostHintName()), - common.AnyField("timestamp", commitEntry.StartTs.DebugString()), - zap.String("from-objs", fromObjsDesc), - zap.String("from-size", units.BytesSize(float64(fromSize))), + logMergeStart( + mergehost.Name(), + txnInfo, + mergehost.HostHintName(), + commitEntry.StartTs.DebugString(), + commitEntry.MergedObjs, ) defer func() { if err != nil { @@ -123,13 +111,8 @@ func DoMergeAndWrite( } }() - hasSortKey := sortkeyPos >= 0 - if !hasSortKey { - sortkeyPos = 0 // no sort key, use the first column to do reshape - } - - if hasSortKey { - if err = mergeObjs(ctx, mergehost, sortkeyPos, isTombstone); err != nil { + if sortkeyPos >= 0 { + if err = mergeObjs(ctx, mergehost, sortkeyPos); err != nil { return err } } else { @@ -138,25 +121,7 @@ func DoMergeAndWrite( } } - toObjsDesc := "" - toSize := uint32(0) - for _, o := range commitEntry.CreatedObjs { - obj := objectio.ObjectStats(o) - toObjsDesc += fmt.Sprintf("%s(%v, %s)Rows(%v),", - obj.ObjectName().ObjectId().ShortStringEx(), - obj.BlkCnt(), - units.BytesSize(float64(obj.OriginSize())), - obj.Rows()) - toSize += obj.OriginSize() - } - - logutil.Info( - "[MERGE-END]", - zap.String("task", mergehost.Name()), - common.AnyField("to-objs", toObjsDesc), - common.AnyField("to-size", units.BytesSize(float64(toSize))), - common.DurationField(time.Since(now)), - ) + logMergeEnd(mergehost.Name(), start, commitEntry.CreatedObjs) return nil } @@ -238,3 +203,67 @@ func UpdateMappingAfterMerge(b api.TransferMaps, mapping []int, toLayout []uint3 totalHandledRows += uint32(size) } } + +func logMergeStart(name, txnInfo, host, startTS string, mergedObjs [][]byte) { + var fromObjsDescBuilder strings.Builder + fromSize, estSize := float64(0), float64(0) + rows, blkn := 0, 0 + for _, o := range mergedObjs { + obj := objectio.ObjectStats(o) + fromObjsDescBuilder.WriteString(fmt.Sprintf("%s(%v, %s)Rows(%v)[%v, %v],", + obj.ObjectName().ObjectId().ShortStringEx(), + obj.BlkCnt(), + units.BytesSize(float64(obj.OriginSize())), + obj.Rows(), + obj.SortKeyZoneMap().GetMin(), + obj.SortKeyZoneMap().GetMax())) + fromSize += float64(obj.OriginSize()) + estSize += float64(obj.Rows() * 20) + rows += int(obj.Rows()) + blkn += int(obj.BlkCnt()) + } + + logutil.Info( + "[MERGE-START]", + zap.String("task", name), + common.AnyField("txn-info", txnInfo), + common.AnyField("host", host), + common.AnyField("timestamp", startTS), + zap.String("from-objs", fromObjsDescBuilder.String()), + zap.String("from-size", units.BytesSize(fromSize)), + zap.String("est-size", units.BytesSize(estSize)), + zap.Int("num-obj", len(mergedObjs)), + common.AnyField("num-blk", blkn), + common.AnyField("rows", rows), + ) + + if host == "TN" { + v2.TaskDNMergeScheduledByCounter.Inc() + v2.TaskDNMergedSizeCounter.Add(fromSize) + } else if host == "CN" { + v2.TaskCNMergeScheduledByCounter.Inc() + v2.TaskCNMergedSizeCounter.Add(fromSize) + } +} + +func logMergeEnd(name string, start time.Time, objs [][]byte) { + toObjsDesc := "" + toSize := float64(0) + for _, o := range objs { + obj := objectio.ObjectStats(o) + toObjsDesc += fmt.Sprintf("%s(%v, %s)Rows(%v),", + obj.ObjectName().ObjectId().ShortStringEx(), + obj.BlkCnt(), + units.BytesSize(float64(obj.OriginSize())), + obj.Rows()) + toSize += float64(obj.OriginSize()) + } + + logutil.Info( + "[MERGE-END]", + zap.String("task", name), + common.AnyField("to-objs", toObjsDesc), + common.AnyField("to-size", units.BytesSize(toSize)), + common.DurationField(time.Since(start)), + ) +} diff --git a/pkg/vm/engine/tae/rpc/handle_debug.go b/pkg/vm/engine/tae/rpc/handle_debug.go index b2fc458423a9b..ad3c33dc743af 100644 --- a/pkg/vm/engine/tae/rpc/handle_debug.go +++ b/pkg/vm/engine/tae/rpc/handle_debug.go @@ -569,7 +569,7 @@ func (h *Handle) HandleCommitMerge( stat := objectio.ObjectStats(o) ids = append(ids, *stat.ObjectName().ObjectId()) } - merge.ActiveCNObj.RemoveActiveCNObj(ids) + h.GetDB().MergeScheduler.RemoveCNActiveObjects(ids) if req.Err != "" { resp.ReturnStr = req.Err err = moerr.NewInternalErrorf(ctx, "merge err in cn: %s", req.Err) diff --git a/pkg/vm/engine/tae/rpc/handle_test.go b/pkg/vm/engine/tae/rpc/handle_test.go index c065f30435c32..de28d4a318f94 100644 --- a/pkg/vm/engine/tae/rpc/handle_test.go +++ b/pkg/vm/engine/tae/rpc/handle_test.go @@ -78,7 +78,7 @@ func TestHandleInspectPolicy(t *testing.T) { Operation: "policy", }, resp) require.NoError(t, err) - require.Equal(t, "(*) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 110MB, offloadToCnSize: 80000MB, hints: [Auto]", resp.Message) + require.Equal(t, "(*) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 90MB, offloadToCnSize: 80000MB, hints: [Auto]", resp.Message) _, err = handle.HandleInspectTN(context.Background(), txn.TxnMeta{}, &cmd_util.InspectTN{ AccessInfo: cmd_util.AccessInfo{}, @@ -92,7 +92,7 @@ func TestHandleInspectPolicy(t *testing.T) { Operation: "policy -t db1.test1 -s true", }, resp) require.NoError(t, err) - require.Equal(t, "(1000-test1) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 110MB, offloadToCnSize: 80000MB, hints: [Auto]", resp.Message) + require.Equal(t, "(1000-test1) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 90MB, offloadToCnSize: 80000MB, hints: [Auto]", resp.Message) _, err = handle.HandleInspectTN(context.Background(), txn.TxnMeta{}, &cmd_util.InspectTN{ AccessInfo: cmd_util.AccessInfo{}, @@ -106,7 +106,7 @@ func TestHandleInspectPolicy(t *testing.T) { Operation: "policy -t db1.test1", }, resp) require.NoError(t, err) - require.Equal(t, "(1000-test1) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 110MB, offloadToCnSize: 80000MB, hints: [Auto]", resp.Message) + require.Equal(t, "(1000-test1) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 90MB, offloadToCnSize: 80000MB, hints: [Auto]", resp.Message) } func TestHandlePrecommitWriteError(t *testing.T) { diff --git a/pkg/vm/engine/tae/rpc/inspect.go b/pkg/vm/engine/tae/rpc/inspect.go index bf71ea35256b3..bb612c58a6c15 100644 --- a/pkg/vm/engine/tae/rpc/inspect.go +++ b/pkg/vm/engine/tae/rpc/inspect.go @@ -854,25 +854,22 @@ func (c *mergePolicyArg) Run() error { if err != nil { return err } - err = c.ctx.db.MergeScheduler.ConfigPolicy(c.tbl, txn, &merge.BasicPolicyConfig{ + if err = c.ctx.db.MergeScheduler.ConfigPolicy(c.tbl, txn, &merge.BasicPolicyConfig{ MergeMaxOneRun: int(c.maxMergeObjN), ObjectMinOsize: minosize, MaxOsizeMergedObj: maxosize, MinCNMergeSize: cnsize, MergeHints: c.hints, - }) - if err != nil { + }); err != nil { return err } if c.stopMerge { - err = c.ctx.db.MergeScheduler.StopMerge(c.tbl, false) - if err != nil { + if err = c.ctx.db.MergeScheduler.StopMerge(c.tbl, false); err != nil { return err } } else { if c.ctx.db.Runtime.LockMergeService.IsLockedByUser(c.tbl.GetID(), c.tbl.GetLastestSchema(false).Name) { - err = c.ctx.db.MergeScheduler.StartMerge(c.tbl.GetID(), false) - if err != nil { + if err = c.ctx.db.MergeScheduler.StartMerge(c.tbl.GetID(), false); err != nil { return err } } @@ -967,10 +964,10 @@ func (c *PolicyStatus) String() string { func (c *PolicyStatus) Run() (err error) { if c.pruneAgo == 0 && c.pruneId == 0 { - c.ctx.resp.Payload = []byte(merge.ActiveCNObj.String()) + c.ctx.resp.Payload = []byte(c.ctx.db.MergeScheduler.CNActiveObjectsString()) return nil } else { - merge.ActiveCNObj.Prune(c.pruneId, c.pruneAgo) + c.ctx.db.MergeScheduler.PruneCNActiveObjects(c.pruneId, c.pruneAgo) return nil } } diff --git a/pkg/vm/engine/tae/tables/jobs/mergeobjects.go b/pkg/vm/engine/tae/tables/jobs/mergeobjects.go index 84176a8c0e810..5636a586ad794 100644 --- a/pkg/vm/engine/tae/tables/jobs/mergeobjects.go +++ b/pkg/vm/engine/tae/tables/jobs/mergeobjects.go @@ -188,7 +188,7 @@ func (task *mergeObjectsTask) GetMPool() *mpool.MPool { return task.rt.VectorPool.Transient.GetMPool() } -func (task *mergeObjectsTask) HostHintName() string { return "DN" } +func (task *mergeObjectsTask) HostHintName() string { return "TN" } func (task *mergeObjectsTask) LoadNextBatch( ctx context.Context, objIdx uint32, @@ -369,7 +369,7 @@ func (task *mergeObjectsTask) Execute(ctx context.Context) (err error) { return moerr.NewInternalErrorNoCtxf("LockMerge give up in exec %v", task.Name()) } phaseDesc = "1-DoMergeAndWrite" - if err = mergesort.DoMergeAndWrite(ctx, task.txn.String(), sortkeyPos, task, task.isTombstone); err != nil { + if err = mergesort.DoMergeAndWrite(ctx, task.txn.String(), sortkeyPos, task); err != nil { return err } diff --git a/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go b/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go index c61335d769019..e62627bc06eaa 100644 --- a/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go +++ b/pkg/vm/engine/tae/tables/txnentries/mergeobjects.go @@ -289,9 +289,9 @@ func (entry *mergeObjectsEntry) transferObjectDeletes( _max = k } } - panic(fmt.Sprintf( - "%s-%d find no transfer mapping for row %d, mapping range (%d, %d)", - dropped.ID().String(), blkOffsetInObj, row, _min, _max)) + err = moerr.NewInternalErrorNoCtxf("%s-%d find no transfer mapping for row %d, mapping range (%d, %d)", + dropped.ID().String(), blkOffsetInObj, row, _min, _max) + return } if entry.delTbls[*entry.createdObjs[destpos.ObjIdx].ID()] == nil { entry.delTbls[*entry.createdObjs[destpos.ObjIdx].ID()] = make(map[uint16]struct{}) diff --git a/pkg/vm/engine/tae/tasks/worker/worker.go b/pkg/vm/engine/tae/tasks/worker/worker.go index c8a33b849151d..60b420d96e485 100644 --- a/pkg/vm/engine/tae/tasks/worker/worker.go +++ b/pkg/vm/engine/tae/tasks/worker/worker.go @@ -29,7 +29,8 @@ import ( type Cmd = uint8 const ( - QUIT Cmd = iota + QUIT Cmd = iota + stuck // For tests only. Stop executing Ops. ) const ( @@ -198,8 +199,13 @@ func (w *OpWorker) SendOp(op iops.IOp) bool { w.Pending.Add(-1) return false } - w.OpC <- op - return true + select { + case w.OpC <- op: + return true + default: + } + w.Pending.Add(-1) + return false } func (w *OpWorker) opCancelOp(op iops.IOp) { @@ -228,6 +234,9 @@ func (w *OpWorker) onCmd(cmd Cmd) { panic("logic error") } w.ClosedCh <- struct{}{} + case stuck: // For test only + <-w.Ctx.Done() + return default: panic(fmt.Sprintf("Unsupported cmd %d", cmd)) } diff --git a/pkg/vm/engine/tae/tasks/worker/worker_test.go b/pkg/vm/engine/tae/tasks/worker/worker_test.go new file mode 100644 index 0000000000000..dd302273c7bd4 --- /dev/null +++ b/pkg/vm/engine/tae/tasks/worker/worker_test.go @@ -0,0 +1,75 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ops + +import ( + "context" + iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type testOp struct{} + +func (t testOp) OnExec(context.Context) error { + return nil +} + +func (t testOp) SetError(error) {} + +func (t testOp) GetError() error { + return nil +} + +func (t testOp) WaitDone(context.Context) error { + return nil +} + +func (t testOp) Waitable() bool { + return true +} + +func (t testOp) GetCreateTime() time.Time { + return time.Time{} +} + +func (t testOp) GetStartTime() time.Time { + return time.Time{} +} + +func (t testOp) GetEndTime() time.Time { + return time.Time{} +} + +func (t testOp) GetExecutTime() int64 { + return 0 +} + +func (t testOp) AddObserver(iops.Observer) {} + +func TestOpWorker(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + worker := NewOpWorker(ctx, "test", 100) + worker.Start() + worker.CmdC <- stuck + for len(worker.CmdC) > 0 { + } + for i := 0; i < 100; i++ { + require.True(t, worker.SendOp(testOp{})) + } + require.Falsef(t, worker.SendOp(testOp{}), "op channel should be full") +} diff --git a/test/distributed/cases/function/mo_ctl/mo_ctl_merge.result b/test/distributed/cases/function/mo_ctl/mo_ctl_merge.result index 0fb7b1408e35c..c0f19dbdd2a88 100644 --- a/test/distributed/cases/function/mo_ctl/mo_ctl_merge.result +++ b/test/distributed/cases/function/mo_ctl/mo_ctl_merge.result @@ -242,4 +242,4 @@ rows_cnt 2 select mo_ctl('dn', 'inspect', 'policy'); mo_ctl(dn, inspect, policy) -\nmsg: (*) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 110MB, offloadToCnSize: 80000MB, hints: [Auto]\n\ngeneral setting has been refreshed +\nmsg: (*) maxMergeObjN: 16, maxOsizeObj: 128MB, minOsizeQualified: 90MB, offloadToCnSize: 80000MB, hints: [Auto]\n\ngeneral setting has been refreshed