Skip to content

Commit

Permalink
Print tn config when opening and replace numCPU with procs (matrixori…
Browse files Browse the repository at this point in the history
…gin#15913)

the output of config string will be like:

```
{"level":"INFO","time":"2024/05/08 17:59:37.438673 +0800","caller":"db/open.go:93","msg":"open-tae","operation":"Config","toml":"MaxMessageSize = 9223372036854775807\nTransferTableTTL = \"1m30s\"\nIncrementalDedup = true\nIsStandalone = true\nLogStoreT = \"logservice\"\n\n[storage-cfg]\n  block-max-rows = 8192\n  object-max-blocks = 256\n\n[checkpoint-cfg]\n  flush-inerterval = \"1m0s\"\n  checkpoint-min-count = 100\n  scan-interval = \"5s\"\n  checkpoint-incremental-interval = \"1m0s\"\n  checkpoint-global-interval = 40\n  overall-flush-mem-control = 1073741824\n  ForceUpdateGlobalInterval = false\n  GlobalVersionInterval = \"1h0m0s\"\n  GCCheckpointInterval = \"1m0s\"\n  DisableGCCheckpoint = false\n  ReservedWALEntryCount = 5000\n  BlockRows = 0\n  Size = 0\n\n[scheduler-cfg]\n  io-workers = 16\n  async-workers = 6\n\n[gc-cfg]\n  gc-ttl = \"1h0m0s\"\n  scan-gc-interval = \"30m0s\"\n  disable-gc = false\n\n[LogtailCfg]\n  page-size = 100\n\n[MergeCfg]\n  CNMergeMemControlHint = 8589934592\n  CNTakeOverAll = false\n  CNTakeOverExceed = 83886080000\n  CNStandaloneTake = false\n\n[CatalogCfg]\n  GCInterval = \"30m0s\"\n  DisableGC = false\n"}
```

Approved by: @XuPeng-SH
  • Loading branch information
aptend authored May 10, 2024
1 parent a2036c3 commit 1b00a08
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 24 deletions.
5 changes: 3 additions & 2 deletions pkg/vm/engine/tae/blockio/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,12 @@ func NewIOPipeline(
}

func (p *IoPipeline) fillDefaults() {
procs := runtime.GOMAXPROCS(0)
if p.options.fetchParallism <= 0 {
p.options.fetchParallism = runtime.NumCPU() * 4
p.options.fetchParallism = procs * 4
}
if p.options.prefetchParallism <= 0 {
p.options.prefetchParallism = runtime.NumCPU() * 4
p.options.prefetchParallism = procs * 4
}
if p.options.queueDepth <= 0 {
p.options.queueDepth = 100000
Expand Down
8 changes: 4 additions & 4 deletions pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnimpl"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"go.uber.org/zap"
)

const (
Expand All @@ -60,9 +59,6 @@ func fillRuntimeOptions(opts *options.Options) {
if opts.MergeCfg.CNStandaloneTake {
common.ShouldStandaloneCNTakeOver.Store(true)
}
w := &bytes.Buffer{}
toml.NewEncoder(w).Encode(opts.MergeCfg)
logutil.Info("mergeblocks options", zap.String("toml", w.String()), zap.Bool("standalone", opts.IsStandalone))
}

func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, err error) {
Expand All @@ -88,6 +84,10 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e
opts = opts.FillDefaults(dirname)
fillRuntimeOptions(opts)

wbuf := &bytes.Buffer{}
werr := toml.NewEncoder(wbuf).Encode(opts)
logutil.Info("open-tae", common.OperationField("Config"),
common.AnyField("toml", wbuf.String()), common.ErrorField(werr))
serviceDir := path.Join(dirname, "data")
if opts.Fs == nil {
// TODO:fileservice needs to be passed in as a parameter
Expand Down
7 changes: 4 additions & 3 deletions pkg/vm/engine/tae/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ func (o *Options) FillDefaults(dirname string) *Options {

if o.SchedulerCfg == nil {
ioworkers := DefaultIOWorkers
if ioworkers < runtime.NumCPU() {
ioworkers = min(runtime.NumCPU(), 100)
procs := runtime.GOMAXPROCS(0)
if ioworkers < procs {
ioworkers = min(procs, 100)
}
workers := min(runtime.NumCPU()/2, 100)
workers := min(procs/2, 100)
if workers < 1 {
workers = 1
}
Expand Down
22 changes: 10 additions & 12 deletions pkg/vm/engine/tae/options/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,17 @@ type Options struct {
MergeCfg *MergeConfig
CatalogCfg *CatalogCfg

// MaxMessageSize is the size of max message which is sent to log-service.
MaxMessageSize uint64
TransferTableTTL time.Duration

IncrementalDedup bool
IsStandalone bool

Clock clock.Clock
Fs fileservice.FileService
Lc logservicedriver.LogServiceClientFactory
Shard metadata.TNShard
LogStoreT LogstoreType
Ctx context.Context
// MaxMessageSize is the size of max message which is sent to log-service.
MaxMessageSize uint64

TaskServiceGetter taskservice.Getter
LogStoreT LogstoreType

Fs fileservice.FileService `toml:"-"`
Lc logservicedriver.LogServiceClientFactory `toml:"-"`
Ctx context.Context `toml:"-"`
Shard metadata.TNShard `toml:"-"`
Clock clock.Clock `toml:"-"`
TaskServiceGetter taskservice.Getter `toml:"-"`
}
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/rpc/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func mockTAEHandle(ctx context.Context, t *testing.T, opts *options.Options) *mo
mh.Handle = &Handle{
db: tae,
}
mh.Handle.txnCtxs = common.NewMap[string, *txnContext](runtime.NumCPU())
mh.Handle.txnCtxs = common.NewMap[string, *txnContext](runtime.GOMAXPROCS(0))
return mh
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/rpc/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewTAEHandle(ctx context.Context, path string, opt *options.Options) *Handl
h := &Handle{
db: tae,
}
h.txnCtxs = common.NewMap[string, *txnContext](runtime.NumCPU())
h.txnCtxs = common.NewMap[string, *txnContext](runtime.GOMAXPROCS(0))

h.GCManager = gc.NewManager(
gc.WithCronJob(
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/txn/txnbase/txnmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock
mgr.PreparingSM = sm.NewStateMachine(new(sync.WaitGroup), mgr, pqueue, prepareWALQueue)

mgr.ctx, mgr.cancel = context.WithCancel(context.Background())
mgr.workers, _ = ants.NewPool(runtime.NumCPU())
mgr.workers, _ = ants.NewPool(runtime.GOMAXPROCS(0))
return mgr
}

Expand Down

0 comments on commit 1b00a08

Please sign in to comment.