Skip to content

Commit

Permalink
Dynamic workspace threshold (#20848)
Browse files Browse the repository at this point in the history
支持workspace write threshold动态扩容

Approved by: @aptend, @zhangxu19830126, @LeftHandCold, @triump2020, @XuPeng-SH, @sukki37
  • Loading branch information
Wenbin1002 authored Dec 23, 2024
1 parent 0635430 commit 330ee11
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package merge
package objectio

import (
"syscall"
"unsafe"
)

func totalMem() uint64 {
func TotalMem() uint64 {
s, err := syscall.Sysctl("hw.memsize")
if err != nil {
return 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package merge
package objectio

import "syscall"

func totalMem() uint64 {
func TotalMem() uint64 {
in := new(syscall.Sysinfo_t)
err := syscall.Sysinfo(in)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/metric/v2/dashboard/grafana_dashboard_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (c *DashboardCreator) initTxnDashboard() error {
c.initTxnShowAccountsRow(),
c.initCNCommittedObjectQuantityRow(),
c.initTombstoneTransferRow(),
c.initTxnExtraWorkspaceQuota(),
)...)
if err != nil {
return err
Expand Down Expand Up @@ -570,3 +571,14 @@ func (c *DashboardCreator) initTombstoneTransferRow() dashboard.Option {
),
)
}

func (c *DashboardCreator) initTxnExtraWorkspaceQuota() dashboard.Option {
return dashboard.Row(
"Extra Workspace Quota",
c.withGraph(
"Extra Workspace Quota",
12,
`sum(`+c.getMetricWithFilter("mo_txn_extra_workspace_quota", ``)+`)`,
"{{ "+c.by+" }}", axis.Unit("decbytes")),
)
}
1 change: 1 addition & 0 deletions pkg/util/metric/v2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func initTxnMetrics() {
registry.MustRegister(txnReaderTombstoneSelectivityHistogram)
registry.MustRegister(txnTransferDurationHistogram)
registry.MustRegister(TransferTombstonesCountHistogram)
registry.MustRegister(TxnExtraWorkspaceQuotaGauge)
}

func initRPCMetrics() {
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/metric/v2/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,13 @@ var (
TransferTombstonesDurationHistogram = txnTransferDurationHistogram.WithLabelValues("tombstones")
BatchTransferTombstonesDurationHistogram = txnTransferDurationHistogram.WithLabelValues("batch")
)

var (
TxnExtraWorkspaceQuotaGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "mo",
Subsystem: "txn",
Name: "extra_workspace_quota",
Help: "Extra workspace quota for txn.",
})
)
31 changes: 31 additions & 0 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/lockservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo"
Expand Down Expand Up @@ -163,15 +164,24 @@ func (e *Engine) fillDefaults() {
if e.config.writeWorkspaceThreshold <= 0 {
e.config.writeWorkspaceThreshold = WriteWorkspaceThreshold
}
if e.config.extraWorkspaceThreshold <= 0 {
e.config.extraWorkspaceThreshold = ExtraWorkspaceThreshold
}
if e.config.cnTransferTxnLifespanThreshold <= 0 {
e.config.cnTransferTxnLifespanThreshold = CNTransferTxnLifespanThreshold
}
if e.config.quota.Load() <= 0 {
mem := objectio.TotalMem() / 100 * 5
e.config.quota.Store(mem)
v2.TxnExtraWorkspaceQuotaGauge.Set(float64(mem))
}

logutil.Info(
"INIT-ENGINE-CONFIG",
zap.Int("InsertEntryMaxCount", e.config.insertEntryMaxCount),
zap.Uint64("CommitWorkspaceThreshold", e.config.commitWorkspaceThreshold),
zap.Uint64("WriteWorkspaceThreshold", e.config.writeWorkspaceThreshold),
zap.Uint64("ExtraWorkspaceThresholdQuota", e.config.quota.Load()),
zap.Duration("CNTransferTxnLifespanThreshold", e.config.cnTransferTxnLifespanThreshold),
)
}
Expand All @@ -191,6 +201,27 @@ func (e *Engine) SetWorkspaceThreshold(commitThreshold, writeThreshold uint64) (
return
}

func (e *Engine) AcquireQuota(v uint64) (uint64, bool) {
for {
oldRemaining := e.config.quota.Load()
if oldRemaining < v {
return 0, false
}
remaining := oldRemaining - v
if e.config.quota.CompareAndSwap(oldRemaining, remaining) {
v2.TxnExtraWorkspaceQuotaGauge.Set(float64(remaining))
return remaining, true
}
}
}

func (e *Engine) ReleaseQuota(quota uint64) (remaining uint64) {
e.config.quota.Add(quota)
remaining = e.config.quota.Load()
v2.TxnExtraWorkspaceQuotaGauge.Set(float64(remaining))
return
}

func (e *Engine) GetService() string {
return e.service
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,40 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
if size < txn.writeWorkspaceThreshold {
return nil
}

if size < txn.engine.config.extraWorkspaceThreshold {
// try to increase the write threshold from quota, if failed, then dump all
// acquire 5M more than we need
quota := size - txn.writeWorkspaceThreshold + txn.engine.config.writeWorkspaceThreshold
remaining, acquired := txn.engine.AcquireQuota(quota)
if acquired {
logutil.Info(
"WORKSPACE-QUOTA-ACQUIRE",
zap.Uint64("quota", quota),
zap.Uint64("remaining", remaining),
)
txn.writeWorkspaceThreshold += quota
txn.extraWriteWorkspaceThreshold += quota
return nil
}
}
size = 0
}
txn.hasS3Op.Store(true)

if err := txn.dumpInsertBatchLocked(ctx, offset, &size, &pkCount); err != nil {
return err
}
// release the extra quota
if txn.extraWriteWorkspaceThreshold > 0 {
remaining := txn.engine.ReleaseQuota(txn.extraWriteWorkspaceThreshold)
logutil.Info(
"WORKSPACE-QUOTA-RELEASE",
zap.Uint64("quota", txn.extraWriteWorkspaceThreshold),
zap.Uint64("remaining", remaining),
)
txn.extraWriteWorkspaceThreshold = 0
}

if dumpAll {
if txn.approximateInMemDeleteCnt >= txn.engine.config.insertEntryMaxCount {
Expand Down Expand Up @@ -1462,6 +1489,16 @@ func (txn *Transaction) delTransaction() {
txn.transfer.timestamps = nil
txn.transfer.lastTransferred = types.TS{}
txn.transfer.pendingTransfer = false

if txn.extraWriteWorkspaceThreshold > 0 {
remaining := txn.engine.ReleaseQuota(txn.extraWriteWorkspaceThreshold)
logutil.Info(
"WORKSPACE-QUOTA-RELEASE",
zap.Uint64("quota", txn.extraWriteWorkspaceThreshold),
zap.Uint64("remaining", remaining),
)
txn.extraWriteWorkspaceThreshold = 0
}
}

func (txn *Transaction) rollbackTableOpLocked() {
Expand Down
15 changes: 12 additions & 3 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ const (
const (
CommitWorkspaceThreshold uint64 = 1 * mpool.MB
WriteWorkspaceThreshold uint64 = 5 * mpool.MB
ExtraWorkspaceThreshold uint64 = 500 * mpool.MB
InsertEntryThreshold = 5000
GCBatchOfFileCount int = 1000
GCPoolSize int = 5
Expand Down Expand Up @@ -168,6 +169,12 @@ func WithWriteWorkspaceThreshold(th uint64) EngineOptions {
}
}

func WithExtraWorkspaceThresholdQuota(quota uint64) EngineOptions {
return func(e *Engine) {
e.config.quota.Store(quota)
}
}

func WithInsertEntryMaxCount(th int) EngineOptions {
return func(e *Engine) {
e.config.insertEntryMaxCount = th
Expand Down Expand Up @@ -215,6 +222,8 @@ type Engine struct {
insertEntryMaxCount int
commitWorkspaceThreshold uint64
writeWorkspaceThreshold uint64
extraWorkspaceThreshold uint64
quota atomic.Uint64

cnTransferTxnLifespanThreshold time.Duration

Expand Down Expand Up @@ -364,8 +373,9 @@ type Transaction struct {

haveDDL atomic.Bool

writeWorkspaceThreshold uint64
commitWorkspaceThreshold uint64
writeWorkspaceThreshold uint64
commitWorkspaceThreshold uint64
extraWriteWorkspaceThreshold uint64 // acquired from engine quota
}

type Pos struct {
Expand Down Expand Up @@ -423,7 +433,6 @@ func (b *deletedBlocks) iter(fn func(*types.Blockid, []int64) bool) {
func NewTxnWorkSpace(eng *Engine, proc *process.Process) *Transaction {
id := objectio.NewSegmentid()
bytes := types.EncodeUuid(id)

txn := &Transaction{
proc: proc,
engine: eng,
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/merge/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *resourceController) setMemLimit(total uint64) {

func (c *resourceController) refresh() {
if c.limit == 0 {
c.setMemLimit(totalMem())
c.setMemLimit(objectio.TotalMem())
}

if c.proc == nil {
Expand Down
Loading

0 comments on commit 330ee11

Please sign in to comment.