diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index ce1a4ae160c9b..998bc5279bf84 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -468,9 +468,7 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error { if !dumpAll { for i := offset; i < len(txn.writes); i++ { - if txn.writes[i].tableId == catalog.MO_DATABASE_ID || - txn.writes[i].tableId == catalog.MO_TABLES_ID || - txn.writes[i].tableId == catalog.MO_COLUMNS_ID { + if txn.writes[i].isCatalog() { continue } if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 { @@ -518,9 +516,9 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error { } func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, size *uint64, pkCount *int) error { - mp := make(map[tableKey][]*batch.Batch) lastWritesIndex := offset writes := txn.writes + mp := make(map[tableKey][]*batch.Batch) for i := offset; i < len(txn.writes); i++ { if txn.writes[i].isCatalog() { writes[lastWritesIndex] = writes[i] @@ -545,12 +543,12 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s *size += uint64(bat.Size()) *pkCount += bat.RowCount() // skip rowid - newBat := batch.NewWithSize(len(bat.Vecs) - 1) - newBat.SetAttributes(bat.Attrs[1:]) - newBat.Vecs = bat.Vecs[1:] - newBat.SetRowCount(bat.Vecs[0].Length()) - mp[tbKey] = append(mp[tbKey], newBat) - txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat) + newBatch := batch.NewWithSize(len(bat.Vecs) - 1) + newBatch.SetAttributes(bat.Attrs[1:]) + newBatch.Vecs = bat.Vecs[1:] + newBatch.SetRowCount(bat.Vecs[0].Length()) + mp[tbKey] = append(mp[tbKey], newBatch) + defer bat.Clean(txn.proc.GetMPool()) keepElement = false } @@ -626,9 +624,9 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, size *uint64) error { deleteCnt := 0 - mp := make(map[tableKey][]*batch.Batch) lastWritesIndex := offset writes := txn.writes + mp := make(map[tableKey][]*batch.Batch) for i := offset; i < len(txn.writes); i++ { if txn.writes[i].isCatalog() { writes[lastWritesIndex] = writes[i] @@ -659,7 +657,7 @@ func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, s newBat.SetRowCount(bat.Vecs[0].Length()) mp[tbKey] = append(mp[tbKey], newBat) - txn.toFreeBatches[tbKey] = append(txn.toFreeBatches[tbKey], bat) + defer bat.Clean(txn.proc.GetMPool()) keepElement = false } @@ -1402,7 +1400,6 @@ func (txn *Transaction) delTransaction() { } txn.writes[i].bat.Clean(txn.proc.Mp()) } - txn.CleanToFreeBatches() txn.tableCache = nil txn.tableOps = nil txn.databaseMap = nil @@ -1476,7 +1473,6 @@ func (txn *Transaction) CloneSnapshotWS() client.Workspace { }, cnBlkId_Pos: map[types.Blockid]Pos{}, batchSelectList: make(map[*batch.Batch][]int64), - toFreeBatches: make(map[tableKey][]*batch.Batch), cn_flushed_s3_tombstone_object_stats_list: new(sync.Map), } @@ -1497,15 +1493,6 @@ func (txn *Transaction) GetHaveDDL() bool { return txn.haveDDL.Load() } -func (txn *Transaction) CleanToFreeBatches() { - for key := range txn.toFreeBatches { - for _, bat := range txn.toFreeBatches[key] { - bat.Clean(txn.proc.Mp()) - } - delete(txn.toFreeBatches, key) - } -} - func newTableOps() *tableOpsChain { return &tableOpsChain{ names: make(map[tableKey][]tableOp), diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index c5bc5360f99b7..2f77c23d67a73 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -317,7 +317,6 @@ type Transaction struct { //} //select list for raw batch comes from txn.writes.batch. batchSelectList map[*batch.Batch][]int64 - toFreeBatches map[tableKey][]*batch.Batch rollbackCount int //current statement id @@ -428,7 +427,6 @@ func NewTxnWorkSpace(eng *Engine, proc *process.Process) *Transaction { }, cnBlkId_Pos: map[types.Blockid]Pos{}, batchSelectList: make(map[*batch.Batch][]int64), - toFreeBatches: make(map[tableKey][]*batch.Batch), syncCommittedTSCount: eng.cli.GetSyncLatestCommitTSTimes(), cn_flushed_s3_tombstone_object_stats_list: new(sync.Map), } @@ -479,7 +477,7 @@ func (txn *Transaction) PPString() string { return buf.String() } - return fmt.Sprintf("Transaction{writes: %v, batchSelectList: %v, tableOps:%v, tablesInVain: %v, tableCache: %v, toFreeBatches: %v, insertCount: %v, snapshotWriteOffset: %v, rollbackCount: %v, statementID: %v, offsets: %v, timestamps: %v}", + return fmt.Sprintf("Transaction{writes: %v, batchSelectList: %v, tableOps:%v, tablesInVain: %v, tableCache: %v, insertCount: %v, snapshotWriteOffset: %v, rollbackCount: %v, statementID: %v, offsets: %v, timestamps: %v}", writesString, stringifyMap(txn.batchSelectList, func(k, v any) string { return fmt.Sprintf("%p:%v", k, len(v.([]int64))) @@ -489,7 +487,6 @@ func (txn *Transaction) PPString() string { return fmt.Sprintf("%v:%v", k.(uint64), v.(int)) }), stringifySyncMap(txn.tableCache), - len(txn.toFreeBatches), txn.approximateInMemInsertCnt, txn.snapshotWriteOffset, txn.rollbackCount, @@ -530,8 +527,6 @@ func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error txn.Lock() defer txn.Unlock() - //free batches - txn.CleanToFreeBatches() //merge writes for the last statement if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil { return err @@ -779,8 +774,6 @@ func (txn *Transaction) RollbackLastStatement(ctx context.Context) error { afterEntries = len(txn.writes) - txn.CleanToFreeBatches() - for i := len(txn.restoreTxnTableFunc) - 1; i >= 0; i-- { txn.restoreTxnTableFunc[i]() }