Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic workspace threshold #20843

Merged
merged 14 commits into from
Dec 23, 2024
12 changes: 12 additions & 0 deletions pkg/util/metric/v2/dashboard/grafana_dashboard_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (c *DashboardCreator) initTxnDashboard() error {
c.initTxnShowAccountsRow(),
c.initCNCommittedObjectQuantityRow(),
c.initTombstoneTransferRow(),
c.initTxnExtraWorkspaceQuota(),
c.initTxnCheckPKChangedRow(),
)...)
if err != nil {
Expand Down Expand Up @@ -590,3 +591,14 @@ func (c *DashboardCreator) initTombstoneTransferRow() dashboard.Option {
),
)
}

func (c *DashboardCreator) initTxnExtraWorkspaceQuota() dashboard.Option {
return dashboard.Row(
"Extra Workspace Quota",
c.withGraph(
"Extra Workspace Quota",
12,
`sum(`+c.getMetricWithFilter("mo_txn_extra_workspace_quota", ``)+`)`,
"{{ "+c.by+" }}", axis.Unit("decbytes")),
)
}
1 change: 1 addition & 0 deletions pkg/util/metric/v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func initTxnMetrics() {
registry.MustRegister(txnReaderTombstoneSelectivityHistogram)
registry.MustRegister(txnTransferDurationHistogram)
registry.MustRegister(TransferTombstonesCountHistogram)
registry.MustRegister(TxnExtraWorkspaceQuotaGauge)
}

func initRPCMetrics() {
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/metric/v2/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,13 @@ var (
TransferTombstonesDurationHistogram = txnTransferDurationHistogram.WithLabelValues("tombstones")
BatchTransferTombstonesDurationHistogram = txnTransferDurationHistogram.WithLabelValues("batch")
)

var (
TxnExtraWorkspaceQuotaGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "mo",
Subsystem: "txn",
Name: "extra_workspace_quota",
Help: "Extra workspace quota for txn.",
})
)
40 changes: 40 additions & 0 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,18 @@ func (e *Engine) fillDefaults() {
if e.config.cnTransferTxnLifespanThreshold <= 0 {
e.config.cnTransferTxnLifespanThreshold = CNTransferTxnLifespanThreshold
}
if e.config.extraWorkspaceQuota.Load() <= 0 {
mem := totalMem() / 100 * 5
e.config.extraWorkspaceQuota.Store(mem)
v2.TxnExtraWorkspaceQuotaGauge.Set(float64(mem))
}

logutil.Info(
"INIT-ENGINE-CONFIG",
zap.Int("InsertEntryMaxCount", e.config.insertEntryMaxCount),
zap.Uint64("CommitWorkspaceThreshold", e.config.commitWorkspaceThreshold),
zap.Uint64("WriteWorkspaceThreshold", e.config.writeWorkspaceThreshold),
zap.Uint64("ExtraWorkspaceThresholdQuota", e.config.extraWorkspaceQuota.Load()),
zap.Duration("CNTransferTxnLifespanThreshold", e.config.cnTransferTxnLifespanThreshold),
)
}
Expand All @@ -191,6 +197,40 @@ func (e *Engine) SetWorkspaceThreshold(commitThreshold, writeThreshold uint64) (
return
}

func (e *Engine) AcquireQuota(v uint64, quota *MemoryQuota) (uint64, bool) {
for {
oldRemaining := e.config.extraWorkspaceQuota.Load()
if oldRemaining < v {
return 0, false
}
remaining := oldRemaining - v
if e.config.extraWorkspaceQuota.CompareAndSwap(oldRemaining, remaining) {
quota.Apply(v)
v2.TxnExtraWorkspaceQuotaGauge.Set(float64(remaining))
logutil.Info(
"WORKSPACE-QUOTA-ACQUIRE",
zap.Uint64("quota", v),
zap.Uint64("remaining", remaining),
)
return remaining, true
}
}
}

func (e *Engine) ReleaseQuota(quota *MemoryQuota) {
size := quota.Release()
if size == 0 {
return
}
e.config.extraWorkspaceQuota.Add(size)
v2.TxnExtraWorkspaceQuotaGauge.Set(float64(e.config.extraWorkspaceQuota.Load()))
logutil.Info(
"WORKSPACE-QUOTA-RELEASE",
zap.Uint64("quota", size),
zap.Uint64("remaining", e.config.extraWorkspaceQuota.Load()),
)
}

func (e *Engine) GetService() string {
return e.service
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/vm/engine/disttae/meminfo_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package disttae

import (
"syscall"
"unsafe"
)

func totalMem() uint64 {
s, err := syscall.Sysctl("hw.memsize")
if err != nil {
return 0
}
// hack because the string conversion above drops a \0
b := []byte(s)
if len(b) < 8 {
b = append(b, 0)
}
return *(*uint64)(unsafe.Pointer(&b[0]))
}
26 changes: 26 additions & 0 deletions pkg/vm/engine/disttae/meminfo_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package disttae

import "syscall"

func totalMem() uint64 {
in := new(syscall.Sysinfo_t)
err := syscall.Sysinfo(in)
if err != nil {
return 0
}
return in.Totalram * uint64(in.Unit)
}
13 changes: 13 additions & 0 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,17 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
if size < txn.writeWorkspaceThreshold {
return nil
}

// try to increase the write threshold from quota
threshold := txn.writeWorkspaceThreshold
for size >= threshold {
threshold *= 2
}
_, acquired := txn.engine.AcquireQuota(threshold-txn.writeWorkspaceThreshold, &txn.quota)
if acquired {
txn.writeWorkspaceThreshold = threshold
return nil
}
size = 0
}
txn.hasS3Op.Store(true)
Expand Down Expand Up @@ -1463,6 +1474,8 @@ func (txn *Transaction) delTransaction() {
txn.transfer.timestamps = nil
txn.transfer.lastTransferred = types.TS{}
txn.transfer.pendingTransfer = false

txn.engine.ReleaseQuota(&txn.quota)
}

func (txn *Transaction) rollbackTableOpLocked() {
Expand Down
32 changes: 32 additions & 0 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func WithWriteWorkspaceThreshold(th uint64) EngineOptions {
}
}

func WithExtraWorkspaceThresholdQuota(quota uint64) EngineOptions {
return func(e *Engine) {
e.config.extraWorkspaceQuota.Store(quota)
}
}

func WithInsertEntryMaxCount(th int) EngineOptions {
return func(e *Engine) {
e.config.insertEntryMaxCount = th
Expand Down Expand Up @@ -198,6 +204,29 @@ func WithMoServerStateChecker(checker func() bool) EngineOptions {
}
}

type MemoryQuota struct {
state uint8 // active 0, released 1
size uint64
}

func (q *MemoryQuota) Apply(size uint64) bool {
if q.state != 0 {
return false
}
q.size += size
return true
}

func (q *MemoryQuota) Release() (size uint64) {
if q.state == 1 || q.size == 0 {
return 0
}
q.state = 1
size = q.size
q.size = 0
return size
}

type Engine struct {
sync.RWMutex
service string
Expand All @@ -215,6 +244,7 @@ type Engine struct {
insertEntryMaxCount int
commitWorkspaceThreshold uint64
writeWorkspaceThreshold uint64
extraWorkspaceQuota atomic.Uint64

cnTransferTxnLifespanThreshold time.Duration

Expand Down Expand Up @@ -366,6 +396,8 @@ type Transaction struct {

writeWorkspaceThreshold uint64
commitWorkspaceThreshold uint64

quota MemoryQuota
}

type Pos struct {
Expand Down
Loading
Loading