Skip to content

Commit

Permalink
optimize workspace dump memory usage (matrixorigin#20779)
Browse files Browse the repository at this point in the history
Immediately free the batch that has been flushed when workspace dump

Approved by: @triump2020, @sukki37
  • Loading branch information
Wenbin1002 authored Dec 17, 2024
1 parent ec334ef commit 2c297e3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 31 deletions.
33 changes: 10 additions & 23 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {

if !dumpAll {
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].tableId == catalog.MO_DATABASE_ID ||
txn.writes[i].tableId == catalog.MO_TABLES_ID ||
txn.writes[i].tableId == catalog.MO_COLUMNS_ID {
if txn.writes[i].isCatalog() {
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
Expand Down Expand Up @@ -518,9 +516,9 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
}

func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, size *uint64, pkCount *int) error {
mp := make(map[tableKey][]*batch.Batch)
lastWritesIndex := offset
writes := txn.writes
mp := make(map[tableKey][]*batch.Batch)
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
writes[lastWritesIndex] = writes[i]
Expand All @@ -545,12 +543,12 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s
*size += uint64(bat.Size())
*pkCount += bat.RowCount()
// skip rowid
newBat := batch.NewWithSize(len(bat.Vecs) - 1)
newBat.SetAttributes(bat.Attrs[1:])
newBat.Vecs = bat.Vecs[1:]
newBat.SetRowCount(bat.Vecs[0].Length())
mp[tbKey] = append(mp[tbKey], newBat)
txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat)
newBatch := batch.NewWithSize(len(bat.Vecs) - 1)
newBatch.SetAttributes(bat.Attrs[1:])
newBatch.Vecs = bat.Vecs[1:]
newBatch.SetRowCount(bat.Vecs[0].Length())
mp[tbKey] = append(mp[tbKey], newBatch)
defer bat.Clean(txn.proc.GetMPool())

keepElement = false
}
Expand Down Expand Up @@ -626,9 +624,9 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s

func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, size *uint64) error {
deleteCnt := 0
mp := make(map[tableKey][]*batch.Batch)
lastWritesIndex := offset
writes := txn.writes
mp := make(map[tableKey][]*batch.Batch)
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
writes[lastWritesIndex] = writes[i]
Expand Down Expand Up @@ -659,7 +657,7 @@ func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, s
newBat.SetRowCount(bat.Vecs[0].Length())

mp[tbKey] = append(mp[tbKey], newBat)
txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat)
defer bat.Clean(txn.proc.GetMPool())

keepElement = false
}
Expand Down Expand Up @@ -1398,7 +1396,6 @@ func (txn *Transaction) delTransaction() {
}
txn.writes[i].bat.Clean(txn.proc.Mp())
}
txn.CleanToFreeBatches()
txn.tableCache = nil
txn.tableOps = nil
txn.databaseMap = nil
Expand Down Expand Up @@ -1472,7 +1469,6 @@ func (txn *Transaction) CloneSnapshotWS() client.Workspace {
},
cnBlkId_Pos: map[types.Blockid]Pos{},
batchSelectList: make(map[*batch.Batch][]int64),
toFreeBatches: make(map[tableKey][]*batch.Batch),
cn_flushed_s3_tombstone_object_stats_list: new(sync.Map),
writeWorkspaceThreshold: txn.writeWorkspaceThreshold,
}
Expand All @@ -1494,15 +1490,6 @@ func (txn *Transaction) GetHaveDDL() bool {
return txn.haveDDL.Load()
}

func (txn *Transaction) CleanToFreeBatches() {
for key := range txn.toFreeBatches {
for _, bat := range txn.toFreeBatches[key] {
bat.Clean(txn.proc.Mp())
}
delete(txn.toFreeBatches, key)
}
}

func newTableOps() *tableOpsChain {
return &tableOpsChain{
names: make(map[tableKey][]tableOp),
Expand Down
9 changes: 1 addition & 8 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ type Transaction struct {
//}
//select list for raw batch comes from txn.writes.batch.
batchSelectList map[*batch.Batch][]int64
toFreeBatches map[tableKey][]*batch.Batch

rollbackCount int
//current statement id
Expand Down Expand Up @@ -454,7 +453,6 @@ func NewTxnWorkSpace(eng *Engine, proc *process.Process) *Transaction {
},
cnBlkId_Pos: map[types.Blockid]Pos{},
batchSelectList: make(map[*batch.Batch][]int64),
toFreeBatches: make(map[tableKey][]*batch.Batch),
syncCommittedTSCount: eng.cli.GetSyncLatestCommitTSTimes(),
cn_flushed_s3_tombstone_object_stats_list: new(sync.Map),

Expand Down Expand Up @@ -507,7 +505,7 @@ func (txn *Transaction) PPString() string {
return buf.String()
}

return fmt.Sprintf("Transaction{writes: %v, batchSelectList: %v, tableOps:%v, tablesInVain: %v, tableCache: %v, toFreeBatches: %v, insertCount: %v, snapshotWriteOffset: %v, rollbackCount: %v, statementID: %v, offsets: %v, timestamps: %v}",
return fmt.Sprintf("Transaction{writes: %v, batchSelectList: %v, tableOps:%v, tablesInVain: %v, tableCache: %v, insertCount: %v, snapshotWriteOffset: %v, rollbackCount: %v, statementID: %v, offsets: %v, timestamps: %v}",
writesString,
stringifyMap(txn.batchSelectList, func(k, v any) string {
return fmt.Sprintf("%p:%v", k, len(v.([]int64)))
Expand All @@ -517,7 +515,6 @@ func (txn *Transaction) PPString() string {
return fmt.Sprintf("%v:%v", k.(uint64), v.(int))
}),
stringifySyncMap(txn.tableCache),
len(txn.toFreeBatches),
txn.approximateInMemInsertCnt,
txn.snapshotWriteOffset,
txn.rollbackCount,
Expand Down Expand Up @@ -558,8 +555,6 @@ func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error

txn.Lock()
defer txn.Unlock()
//free batches
txn.CleanToFreeBatches()
//merge writes for the last statement
if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
return err
Expand Down Expand Up @@ -807,8 +802,6 @@ func (txn *Transaction) RollbackLastStatement(ctx context.Context) error {

afterEntries = len(txn.writes)

txn.CleanToFreeBatches()

for i := len(txn.restoreTxnTableFunc) - 1; i >= 0; i-- {
txn.restoreTxnTableFunc[i]()
}
Expand Down

0 comments on commit 2c297e3

Please sign in to comment.