Skip to content

Commit

Permalink
align merge with main (#20488)
Browse files Browse the repository at this point in the history
align merge with main

Approved by: @aressu1985, @XuPeng-SH, @zhangxu19830126, @sukki37
  • Loading branch information
w-zr authored Dec 18, 2024
1 parent a7f27c7 commit fa228fe
Show file tree
Hide file tree
Showing 37 changed files with 1,520 additions and 1,136 deletions.
1 change: 1 addition & 0 deletions pkg/common/moerr/cause.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ var (
//pkg/vm/engine/tae/db/merge
CauseCleanUpUselessFiles = NewInternalError(context.Background(), "CleanUpUselessFiles")
CauseOnObject = NewInternalError(context.Background(), "OnObject")
CauseCreateCNMerge = NewInternalError(context.Background(), "CreateCNMergeTask")
//pkg/vm/engine/tae/logstore/driver/logservicedriver
CauseDriverAppender1 = NewInternalError(context.Background(), "DriverAppender append 1")
CauseDriverAppender2 = NewInternalError(context.Background(), "DriverAppender append 2")
Expand Down
4 changes: 3 additions & 1 deletion pkg/taskservice/mysql_task_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ func (m *mysqlTaskStorage) UpdateAsyncTask(ctx context.Context, tasks []task.Asy
}
if t.ExecuteResult != nil {
execResult.Code = t.ExecuteResult.Code
execResult.Error = t.ExecuteResult.Error
if len(execResult.Error) > 1000 {
execResult.Error = execResult.Error[:1000]
}
}

j, err := json.Marshal(t.Metadata.Options)
Expand Down
8 changes: 5 additions & 3 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2172,13 +2172,15 @@ func (tbl *txnTable) MergeObjects(

sortKeyPos, sortKeyIsPK := tbl.getSortKeyPosAndSortKeyIsPK()

// check object visibility
for _, objstat := range objStats {
// check object visibility and set object stats.
for i, objstat := range objStats {
info, exist := state.GetObject(*objstat.ObjectShortName())
if !exist || (!info.DeleteTime.IsEmpty() && info.DeleteTime.LE(&snapshot)) {
logutil.Errorf("object not visible: %s", info.String())
return nil, moerr.NewInternalErrorNoCtxf("object %s not exist", objstat.ObjectName().String())
}
objectio.SetObjectStats(&objstat, &info.ObjectStats)
objStats[i] = objstat
}

tbl.ensureSeqnumsAndTypesExpectRowid()
Expand All @@ -2192,7 +2194,7 @@ func (tbl *txnTable) MergeObjects(
return nil, err
}

err = mergesort.DoMergeAndWrite(ctx, tbl.getTxn().op.Txn().DebugString(), sortKeyPos, taskHost, false)
err = mergesort.DoMergeAndWrite(ctx, tbl.getTxn().op.Txn().DebugString(), sortKeyPos, taskHost)
if err != nil {
taskHost.commitEntry.Err = err.Error()
return taskHost.commitEntry, err
Expand Down
47 changes: 34 additions & 13 deletions pkg/vm/engine/tae/catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math"
"slices"
"sync/atomic"

pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog"
Expand Down Expand Up @@ -379,6 +380,7 @@ func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int, isTom
needCount = math.MaxInt
}

objEntries := make([]*ObjectEntry, 0)
for it.Next() {
objectEntry := it.Item()
if !objectEntry.IsActive() {
Expand All @@ -392,28 +394,45 @@ func (entry *TableEntry) ObjectStats(level common.PPLevel, start, end int, isTom
break
}
needCount--
objEntries = append(objEntries, objectEntry)

stat.ObjectCnt += 1
if objectEntry.GetLoaded() {
stat.Loaded += 1
stat.Rows += int(objectEntry.Rows())
stat.OSize += int(objectEntry.OriginSize())
stat.Csize += int(objectEntry.Size())
}
if level > common.PPL0 {
_ = w.WriteByte('\n')
_, _ = w.WriteString(objectEntry.ID().String())
}

slices.SortFunc(objEntries, func(a, b *ObjectEntry) int {
zmA := a.SortKeyZoneMap()
zmB := b.SortKeyZoneMap()

c := zmA.CompareMin(zmB)
if c != 0 {
return c
}
return zmA.CompareMax(zmB)
})

if level > common.PPL0 {
for _, objEntry := range objEntries {
_ = w.WriteByte('\n')
_, _ = w.WriteString(" ")
_, _ = w.WriteString(objectEntry.StatsString(zonemapKind))
_, _ = w.WriteString(objEntry.ID().String())
_, _ = w.WriteString("\n ")
_, _ = w.WriteString(objEntry.StatsString(zonemapKind))

if w.Len() > 8*common.Const1MBytes {
w.WriteString("\n...(truncated for too long, more than 8 MB)")
break
}
}
if w.Len() > 8*common.Const1MBytes {
w.WriteString("\n...(truncated for too long, more than 8 MB)")
break
if stat.ObjectCnt > 0 {
w.WriteByte('\n')
}
}
if level > common.PPL0 && stat.ObjectCnt > 0 {
w.WriteByte('\n')
}

return
}

Expand All @@ -428,8 +447,10 @@ func (entry *TableEntry) ObjectStatsString(level common.PPLevel, start, end int,
}

summary := fmt.Sprintf(
"summary: %d total, %d unknown, avgRow %d, avgOsize %s, avgCsize %v",
stat.ObjectCnt, stat.ObjectCnt-stat.Loaded, avgRow, common.HumanReadableBytes(avgOsize), common.HumanReadableBytes(avgCsize),
"summary: %d objs, %d unloaded, total orignal size:%s, average orignal size:%s, average rows:%d, average compressed size:%s",
stat.ObjectCnt, stat.ObjectCnt-stat.Loaded,
common.HumanReadableBytes(stat.OSize), common.HumanReadableBytes(avgOsize),
avgRow, common.HumanReadableBytes(avgCsize),
)
detail.WriteString(summary)
return detail.String()
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/common/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
DefaultMinOsizeQualifiedMB = 110 // MB
DefaultMinOsizeQualifiedMB = 90 // MB
DefaultMaxOsizeObjMB = 128 // MB
DefaultMinCNMergeSize = 80000 // MB
DefaultCNMergeMemControlHint = 8192 // MB
Expand Down
2 changes: 0 additions & 2 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ type DB struct {

DBLocker io.Closer

CNMergeSched merge.CNMergeScheduler

Closed *atomic.Value
}

Expand Down
154 changes: 154 additions & 0 deletions pkg/vm/engine/tae/db/merge/cnScheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package merge

import (
"bytes"
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
taskpb "github.com/matrixorigin/matrixone/pkg/pb/task"
"github.com/matrixorigin/matrixone/pkg/taskservice"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
)

func NewTaskServiceGetter(getter taskservice.Getter) *CNMergeScheduler {
return &CNMergeScheduler{
getter: getter,
activeObjects: struct {
sync.Mutex
o map[objectio.ObjectId]activeEntry
}{o: make(map[objectio.ObjectId]activeEntry)},
}
}

type CNMergeScheduler struct {
getter taskservice.Getter

activeObjects struct {
sync.Mutex
o map[objectio.ObjectId]activeEntry
}
}

func (s *CNMergeScheduler) sendMergeTask(ctx context.Context, task *api.MergeTaskEntry) error {
ts, ok := s.getter()
if !ok {
return taskservice.ErrNotReady
}
taskIDPrefix := "Merge:" + strconv.Itoa(int(task.TblId))
asyncTask, err := ts.QueryAsyncTask(ctx,
taskservice.WithTaskMetadataId(taskservice.LIKE, taskIDPrefix+"%"),
taskservice.WithTaskStatusCond(taskpb.TaskStatus_Created, taskpb.TaskStatus_Running))
if err != nil {
return err
}
if len(asyncTask) != 0 {
return moerr.NewInternalError(ctx, fmt.Sprintf("table %q is merging", task.TableName))
}
b, err := task.Marshal()
if err != nil {
return err
}
return ts.CreateAsyncTask(ctx,
taskpb.TaskMetadata{
ID: taskIDPrefix + ":" + strconv.FormatInt(time.Now().Unix(), 10),
Executor: taskpb.TaskCode_MergeObject,
Context: b,
Options: taskpb.TaskOptions{Resource: &taskpb.Resource{Memory: task.EstimatedMemUsage}},
})
}

func (s *CNMergeScheduler) addActiveObjects(entries []*catalog.ObjectEntry) {
s.activeObjects.Lock()
for _, entry := range entries {
s.activeObjects.o[*entry.ID()] = activeEntry{
entry.GetTable().ID,
time.Now(),
}
}
s.activeObjects.Unlock()
}

func (s *CNMergeScheduler) checkOverlapOnCNActive(entries []*catalog.ObjectEntry) bool {
s.activeObjects.Lock()
defer s.activeObjects.Unlock()
for _, entry := range entries {
if _, ok := s.activeObjects.o[*entry.ID()]; ok {
return true
}
}
return false
}

func (s *CNMergeScheduler) activeObjsString() string {
s.activeObjects.Lock()
defer s.activeObjects.Unlock()

b := &bytes.Buffer{}
now := time.Now()
for k, v := range s.activeObjects.o {
b.WriteString(fmt.Sprintf(" id: %v, table: %v, insertAt: %s ago\n",
k.String(), v.tid, now.Sub(v.insertAt).String()))
}
return b.String()
}

func (s *CNMergeScheduler) removeActiveObject(ids []objectio.ObjectId) {
s.activeObjects.Lock()
defer s.activeObjects.Unlock()
for _, id := range ids {
delete(s.activeObjects.o, id)
}
}

func (s *CNMergeScheduler) prune(id uint64, ago time.Duration) {
s.activeObjects.Lock()
defer s.activeObjects.Unlock()
now := time.Now()
if ago == 0 {
for k, v := range s.activeObjects.o {
if v.tid == id {
delete(s.activeObjects.o, k)
}
}
return
}

if id == 0 && ago > 1*time.Second {
for k, v := range s.activeObjects.o {
if now.Sub(v.insertAt) > ago {
delete(s.activeObjects.o, k)
}
}
return
}
for k, v := range s.activeObjects.o {
if v.tid == id && now.Sub(v.insertAt) > ago {
delete(s.activeObjects.o, k)
}
}
}

type activeEntry struct {
tid uint64
insertAt time.Time
}
Loading

0 comments on commit fa228fe

Please sign in to comment.