Skip to content

Commit

Permalink
remove some logic from multi_update (#20788) (#20795)
Browse files Browse the repository at this point in the history
remove some logic from multi_update (#20788)

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
ouyuanning authored Dec 17, 2024
1 parent 1739052 commit 91ea798
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 38 deletions.
23 changes: 4 additions & 19 deletions pkg/container/batch/compact_batchs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/vector"
)

const (
Expand All @@ -31,7 +30,6 @@ const (
// until bats.Batchs[lastIdx].rowCount to DefaultBatchMaxRow
type CompactBatchs struct {
batchs []*Batch
ufs []func(*vector.Vector, *vector.Vector) error // functions for vector union
}

func NewCompactBatchs() *CompactBatchs {
Expand Down Expand Up @@ -160,13 +158,6 @@ func (bats *CompactBatchs) fillData(mpool *mpool.MPool, inBatch *Batch) error {
var tmpBat *Batch
var err error

if len(bats.ufs) == 0 {
for i := 0; i < inBatch.VectorCount(); i++ {
typ := *inBatch.GetVector(int32(i)).GetType()
bats.ufs = append(bats.ufs, vector.GetUnionAllFunction(typ, mpool))
}
}

//fill data
start, end := 0, inBatch.RowCount()
isNewBat := false
Expand All @@ -193,21 +184,15 @@ func (bats *CompactBatchs) fillData(mpool *mpool.MPool, inBatch *Batch) error {
return err
}
}
tmpBat.AddRowCount(addRowCount)
} else {
for i := range tmpBat.Vecs {
srcVec, err := inBatch.Vecs[i].Window(start, start+addRowCount)
if err != nil {
return err
}
err = bats.ufs[i](tmpBat.Vecs[i], srcVec)
if err != nil {
return err
}
err := tmpBat.UnionWindow(inBatch, start, addRowCount, mpool)
if err != nil {
return err
}
}

start = start + addRowCount
tmpBat.AddRowCount(addRowCount)
}

return nil
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/colexec/multi_update/s3writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,6 @@ func (writer *s3Writer) prepareDeleteBatchs(

for _, bat := range src {
rowIDVec := bat.GetVector(RowIDIdx)
if rowIDVec.IsConstNull() {
continue
}
nulls := rowIDVec.GetNulls()
if nulls.Count() == bat.RowCount() {
continue
Expand Down Expand Up @@ -272,7 +269,7 @@ func (writer *s3Writer) sortAndSync(proc *process.Process, analyzer process.Anal
if len(updateCtx.DeleteCols) > 0 {
var delBatchs []*batch.Batch
if parititionCount == 0 {
bats, err = fetchSomeVecFromCompactBatchs(proc, writer.cacheBatchs, updateCtx.DeleteCols, DeleteBatchAttrs)
bats, err = fetchSomeVecFromCompactBatchs(writer.cacheBatchs, updateCtx.DeleteCols, DeleteBatchAttrs)
if err != nil {
return
}
Expand Down Expand Up @@ -341,7 +338,7 @@ func (writer *s3Writer) sortAndSync(proc *process.Process, analyzer process.Anal
}
}
}
bats, err = fetchSomeVecFromCompactBatchs(proc, writer.cacheBatchs, updateCtx.InsertCols, insertAttrs)
bats, err = fetchSomeVecFromCompactBatchs(writer.cacheBatchs, updateCtx.InsertCols, insertAttrs)
needSortBatch = false
needCleanBatch = false
}
Expand Down
15 changes: 1 addition & 14 deletions pkg/sql/colexec/multi_update/s3writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,9 @@ func cloneSomeVecFromCompactBatchs(
// fetchSomeVecFromCompactBatchs fetch some vectors from CompactBatchs
// do not clean these batchs
func fetchSomeVecFromCompactBatchs(
proc *process.Process,
src *batch.CompactBatchs,
cols []int,
attrs []string) ([]*batch.Batch, error) {
mp := proc.GetMPool()
var newBat *batch.Batch
retBats := make([]*batch.Batch, src.Length())
for i := 0; i < src.Length(); i++ {
Expand All @@ -258,18 +256,7 @@ func fetchSomeVecFromCompactBatchs(
newBat.Attrs = attrs
for j, idx := range cols {
oldVec := oldBat.Vecs[idx]
//expand constant vector
if oldVec.IsConst() {
newVec := vector.NewVec(*oldVec.GetType())
err := vector.GetUnionAllFunction(*oldVec.GetType(), mp)(newVec, oldVec)
if err != nil {
return nil, err
}
oldBat.ReplaceVector(oldVec, newVec, 0)
newBat.Vecs[j] = newVec
} else {
newBat.Vecs[j] = oldVec
}
newBat.Vecs[j] = oldVec
}
newBat.SetRowCount(newBat.Vecs[0].Length())
retBats[i] = newBat
Expand Down

0 comments on commit 91ea798

Please sign in to comment.