Skip to content

Commit

Permalink
Overall mem cap for flushing (#15580)
Browse files Browse the repository at this point in the history
optimize the case where there are too many active tables being inserted.

Approved by: @XuPeng-SH, @zhangxu19830126
  • Loading branch information
aptend authored Apr 17, 2024
1 parent 2fef14c commit 7afccf9
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 38 deletions.
13 changes: 7 additions & 6 deletions pkg/tnservice/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ type Config struct {
RPC rpc.Config `toml:"rpc"`

Ckp struct {
FlushInterval toml.Duration `toml:"flush-interval"`
ScanInterval toml.Duration `toml:"scan-interval"`
MinCount int64 `toml:"min-count"`
IncrementalInterval toml.Duration `toml:"incremental-interval"`
GlobalMinCount int64 `toml:"global-min-count"`
ReservedWALEntryCount uint64 `toml:"reserved-WAL-entry-count"`
FlushInterval toml.Duration `toml:"flush-interval"`
ScanInterval toml.Duration `toml:"scan-interval"`
MinCount int64 `toml:"min-count"`
IncrementalInterval toml.Duration `toml:"incremental-interval"`
GlobalMinCount int64 `toml:"global-min-count"`
ReservedWALEntryCount uint64 `toml:"reserved-WAL-entry-count"`
OverallFlushMemControl uint64 `toml:"overall-flush-mem-control"`
}

GCCfg struct {
Expand Down
13 changes: 7 additions & 6 deletions pkg/tnservice/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ func (s *store) newTAEStorage(ctx context.Context, shard metadata.TNShard, facto
}

ckpcfg := &options.CheckpointCfg{
MinCount: s.cfg.Ckp.MinCount,
ScanInterval: s.cfg.Ckp.ScanInterval.Duration,
FlushInterval: s.cfg.Ckp.FlushInterval.Duration,
IncrementalInterval: s.cfg.Ckp.IncrementalInterval.Duration,
GlobalMinCount: s.cfg.Ckp.GlobalMinCount,
ReservedWALEntryCount: s.cfg.Ckp.ReservedWALEntryCount,
MinCount: s.cfg.Ckp.MinCount,
ScanInterval: s.cfg.Ckp.ScanInterval.Duration,
FlushInterval: s.cfg.Ckp.FlushInterval.Duration,
IncrementalInterval: s.cfg.Ckp.IncrementalInterval.Duration,
GlobalMinCount: s.cfg.Ckp.GlobalMinCount,
ReservedWALEntryCount: s.cfg.Ckp.ReservedWALEntryCount,
OverallFlushMemControl: s.cfg.Ckp.OverallFlushMemControl,
}

gcCfg := &options.GCCfg{
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/common/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var (
IsStandaloneBoost atomic.Bool
ShouldStandaloneCNTakeOver atomic.Bool
Epsilon float64

RuntimeOverallFlushMemCap atomic.Uint64
)

func init() {
Expand Down
90 changes: 64 additions & 26 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math/rand"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -78,6 +79,12 @@ type globalCheckpointContext struct {
ckpLSN uint64
}

type tableAndSize struct {
tbl *catalog.TableEntry
asize int
dsize int
}

// Q: What does runner do?
// A: A checkpoint runner organizes and manages all checkpoint-related behaviors. It roughly
// does the following things:
Expand Down Expand Up @@ -223,6 +230,8 @@ type runner struct {
postCheckpointQueue sm.Queue
gcCheckpointQueue sm.Queue

objMemSizeList []tableAndSize

onceStart sync.Once
onceStop sync.Once
}
Expand Down Expand Up @@ -900,55 +909,74 @@ func (r *runner) tryCompactTree(entry *logtail.DirtyTreeEntry, force bool) {
return
}
logutil.Debugf(entry.String())
visitor := new(model.BaseTreeVisitor)

visitor.TableFn = func(dbID, tableID uint64) error {
db, err := r.catalog.GetDatabaseByID(dbID)
r.objMemSizeList = r.objMemSizeList[:0]
sizevisitor := new(model.BaseTreeVisitor)
var totalSize, totalASize int
sizevisitor.TableFn = func(did, tid uint64) error {
db, err := r.catalog.GetDatabaseByID(did)
if err != nil {
panic(err)
}
table, err := db.GetTableEntryByID(tableID)
table, err := db.GetTableEntryByID(tid)
if err != nil {
panic(err)
}

if !table.Stats.Inited {
table.Stats.Lock()
table.Stats.InitWithLock(r.options.maxFlushInterval)
table.Stats.Unlock()
}
dirtyTree := entry.GetTree().GetTable(tid)
asize, dsize := r.EstimateTableMemSize(table, dirtyTree)
totalSize += asize + dsize
totalASize += asize
r.objMemSizeList = append(r.objMemSizeList, tableAndSize{table, asize, dsize})
return moerr.GetOkStopCurrRecur()
}
if err := entry.GetTree().Visit(sizevisitor); err != nil {
panic(err)
}

dirtyTree := entry.GetTree().GetTable(tableID)
_, endTs := entry.GetTimeRange()
slices.SortFunc(r.objMemSizeList, func(a, b tableAndSize) int {
if a.asize > b.asize {
return -1
} else if a.asize < b.asize {
return 1
} else {
return 0
}
})

asize, dsize := r.EstimateTableMemSize(table, dirtyTree)
pressure := float64(totalSize) / float64(common.RuntimeCNMergeMemControl.Load())
if pressure > 1.0 {
pressure = 1.0
}

logutil.Infof("[flushtabletail] scan result: pressure %v, totalsize %v", pressure, common.HumanReadableBytes(totalSize))

for _, ticket := range r.objMemSizeList {
table, asize, dsize := ticket.tbl, ticket.asize, ticket.dsize
dirtyTree := entry.GetTree().GetTable(table.ID)
_, endTs := entry.GetTimeRange()

stats := table.Stats
stats.Lock()
defer stats.Unlock()

// debug log, delete later
if !stats.LastFlush.IsEmpty() && asize+dsize > 2*1000*1024 {
logutil.Infof("[flushtabletail] %v(%v) %v dels FlushCountDown %v",
table.GetLastestSchemaLocked().Name,
common.HumanReadableBytes(asize+dsize),
common.HumanReadableBytes(dsize),
time.Until(stats.FlushDeadline))
}

if force {
logutil.Infof("[flushtabletail] force flush %v-%s", table.ID, table.GetLastestSchemaLocked().Name)
if err := r.fireFlushTabletail(table, dirtyTree, endTs); err == nil {
stats.ResetDeadlineWithLock()
}
return moerr.GetOkStopCurrRecur()
continue
}

if stats.LastFlush.IsEmpty() {
// first boot, just bail out, and never enter this branch again
stats.LastFlush = stats.LastFlush.Next()
stats.ResetDeadlineWithLock()
return moerr.GetOkStopCurrRecur()
continue
}

flushReady := func() bool {
Expand All @@ -961,19 +989,29 @@ func (r *runner) tryCompactTree(entry *logtail.DirtyTreeEntry, force bool) {
if asize < common.Const1MBytes && dsize > 2*common.Const1MBytes+common.Const1MBytes/2 {
return true
}
if asize > common.Const1MBytes && rand.Float64() < pressure {
return true
}
return false
}

if flushReady() {
ready := flushReady()
// debug log, delete later
if !stats.LastFlush.IsEmpty() && asize+dsize > 2*1000*1024 {
logutil.Infof("[flushtabletail] %v(%v) %v dels FlushCountDown %v, flushReady %v",
table.GetLastestSchemaLocked().Name,
common.HumanReadableBytes(asize+dsize),
common.HumanReadableBytes(dsize),
time.Until(stats.FlushDeadline),
ready,
)
}

if ready {
if err := r.fireFlushTabletail(table, dirtyTree, endTs); err == nil {
stats.ResetDeadlineWithLock()
}
}

return moerr.GetOkStopCurrRecur()
}
if err := entry.GetTree().Visit(visitor); err != nil {
panic(err)
}
}

Expand All @@ -998,7 +1036,7 @@ func (r *runner) onDirtyEntries(entries ...any) {
}

func (r *runner) crontask(ctx context.Context) {
lag := 2 * time.Second
lag := 3 * time.Second
if r.options.maxFlushInterval < time.Second {
lag = 0 * time.Second
}
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func fillRuntimeOptions(opts *options.Options) {
common.RuntimeCNMergeMemControl.Store(opts.MergeCfg.CNMergeMemControlHint)
common.RuntimeMinCNMergeSize.Store(opts.MergeCfg.CNTakeOverExceed)
common.RuntimeCNTakeOverAll.Store(opts.MergeCfg.CNTakeOverAll)
common.RuntimeCNMergeMemControl.Store(opts.CheckpointCfg.OverallFlushMemControl)
if opts.IsStandalone {
common.IsStandaloneBoost.Store(true)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/options/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type CheckpointCfg struct {
ScanInterval time.Duration `toml:"scan-interval"`
IncrementalInterval time.Duration `toml:"checkpoint-incremental-interval"`
GlobalMinCount int64 `toml:"checkpoint-global-interval"`
OverallFlushMemControl uint64 `toml:"overall-flush-mem-control"`
ForceUpdateGlobalInterval bool
GlobalVersionInterval time.Duration
GCCheckpointInterval time.Duration
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/engine/tae/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (o *Options) FillDefaults(dirname string) *Options {
if o.CheckpointCfg.GlobalMinCount <= 0 {
o.CheckpointCfg.GlobalMinCount = DefaultCheckpointMinCount
}
if o.CheckpointCfg.OverallFlushMemControl <= 0 {
o.CheckpointCfg.OverallFlushMemControl = DefaultOverallFlushMemControl
}
if o.CheckpointCfg.MinCount <= 0 {
o.CheckpointCfg.MinCount = DefaultCheckpointMinCount
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/options/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/logservicedriver"
)

Expand All @@ -42,6 +43,7 @@ const (
DefaultCheckpointGlobalMinCount = 10
DefaultGlobalVersionInterval = time.Hour
DefaultGCCheckpointInterval = time.Minute
DefaultOverallFlushMemControl = common.Const1GBytes

DefaultScanGCInterval = time.Minute * 30
DefaultGCTTL = time.Hour
Expand Down

0 comments on commit 7afccf9

Please sign in to comment.