Skip to content

Commit

Permalink
fix big txn dump (matrixorigin#20899)
Browse files Browse the repository at this point in the history
修复了workspace 超过10M的大事务不刷盘的问题

Approved by: @triump2020, @XuPeng-SH
  • Loading branch information
Wenbin1002 authored Dec 24, 2024
1 parent e6c90d8 commit 0e2d0c1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 46 deletions.
94 changes: 48 additions & 46 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
"WORKSPACE-QUOTA-ACQUIRE",
zap.Uint64("quota", quota),
zap.Uint64("remaining", remaining),
zap.String("txn", txn.op.Txn().DebugString()),
)
txn.writeWorkspaceThreshold += quota
txn.extraWriteWorkspaceThreshold += quota
Expand All @@ -513,8 +514,10 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
"WORKSPACE-QUOTA-RELEASE",
zap.Uint64("quota", txn.extraWriteWorkspaceThreshold),
zap.Uint64("remaining", remaining),
zap.String("txn", txn.op.Txn().DebugString()),
)
txn.extraWriteWorkspaceThreshold = 0
txn.writeWorkspaceThreshold = txn.engine.config.writeWorkspaceThreshold
}

if dumpAll {
Expand Down Expand Up @@ -573,30 +576,30 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s
if tbCount[k] >= txn.engine.config.insertEntryMaxCount {
continue
}
if uint64(sum+tbSize[k]) >= txn.writeWorkspaceThreshold {
if uint64(sum+tbSize[k]) >= txn.commitWorkspaceThreshold {
break
}
sum += tbSize[k]
skipTable[k] = true
}

lastWritesIndex := offset
lastWriteIndex := offset
writes := txn.writes
mp := make(map[tableKey][]*batch.Batch)
for i := offset; i < len(txn.writes); i++ {
if skipTable[txn.writes[i].tableId] {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
continue
}
if txn.writes[i].isCatalog() {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
continue
}

Expand All @@ -612,23 +615,23 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s
*size += uint64(bat.Size())
*pkCount += bat.RowCount()
// skip rowid
newBatch := batch.NewWithSize(len(bat.Vecs) - 1)
newBatch.SetAttributes(bat.Attrs[1:])
newBatch.Vecs = bat.Vecs[1:]
newBatch.SetRowCount(bat.Vecs[0].Length())
mp[tbKey] = append(mp[tbKey], newBatch)
newBat := batch.NewWithSize(len(bat.Vecs) - 1)
newBat.SetAttributes(bat.Attrs[1:])
newBat.Vecs = bat.Vecs[1:]
newBat.SetRowCount(bat.Vecs[0].Length())
mp[tbKey] = append(mp[tbKey], newBat)
defer bat.Clean(txn.proc.GetMPool())

keepElement = false
}

if keepElement {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
}
}

txn.writes = writes[:lastWritesIndex]
txn.writes = writes[:lastWriteIndex]

for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
Expand Down Expand Up @@ -693,18 +696,18 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s

func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, size *uint64) error {
deleteCnt := 0
lastWritesIndex := offset
lastWriteIndex := offset
writes := txn.writes
mp := make(map[tableKey][]*batch.Batch)
for i := offset; i < len(txn.writes); i++ {
if txn.writes[i].isCatalog() {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
continue
}
if txn.writes[i].bat == nil || txn.writes[i].bat.RowCount() == 0 {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
continue
}

Expand All @@ -720,24 +723,24 @@ func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, s
deleteCnt += bat.RowCount()
*size += uint64(bat.Size())

newBat := batch.NewWithSize(len(bat.Vecs))
newBat.SetAttributes(bat.Attrs)
newBat.Vecs = bat.Vecs
newBat.SetRowCount(bat.Vecs[0].Length())
newBatch := batch.NewWithSize(len(bat.Vecs))
newBatch.SetAttributes(bat.Attrs)
newBatch.Vecs = bat.Vecs
newBatch.SetRowCount(bat.Vecs[0].Length())

mp[tbKey] = append(mp[tbKey], newBat)
mp[tbKey] = append(mp[tbKey], newBatch)
defer bat.Clean(txn.proc.GetMPool())

keepElement = false
}

if keepElement {
writes[lastWritesIndex] = writes[i]
lastWritesIndex++
writes[lastWriteIndex] = writes[i]
lastWriteIndex++
}
}

txn.writes = writes[:lastWritesIndex]
txn.writes = writes[:lastWriteIndex]

for tbKey := range mp {
// scenario 2 for cn write s3, more info in the comment of S3Writer
Expand Down Expand Up @@ -1309,6 +1312,22 @@ func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) {
return nil, nil
}

if err := txn.IncrStatementID(ctx, true); err != nil {
return nil, err
}

if err := txn.transferTombstonesByCommit(ctx); err != nil {
return nil, err
}

if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
return nil, err
}
if err := txn.dumpBatchLocked(ctx, -1); err != nil {
return nil, err
}
txn.traceWorkspaceLocked(true)

if txn.workspaceSize > 10*mpool.MB {
logutil.Info(
"BIG-TXN",
Expand All @@ -1333,23 +1352,6 @@ func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) {
)
}

if err := txn.IncrStatementID(ctx, true); err != nil {
return nil, err
}

if err := txn.transferTombstonesByCommit(ctx); err != nil {
return nil, err
}

if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
return nil, err
}
if err := txn.dumpBatchLocked(ctx, -1); err != nil {
return nil, err
}

txn.traceWorkspaceLocked(true)

if !txn.hasS3Op.Load() &&
txn.op.TxnOptions().CheckDupEnabled() {
if err := txn.checkDup(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/test/disttae_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,7 @@ func TestWorkspaceQuota2(t *testing.T) {
ctx,
testutil.TestOptions{},
t,
testutil.WithDisttaeEngineCommitWorkspaceThreshold(1),
testutil.WithDisttaeEngineWriteWorkspaceThreshold(1),
testutil.WithDisttaeEngineQuota(800),
)
Expand Down

0 comments on commit 0e2d0c1

Please sign in to comment.