Skip to content

Commit

Permalink
Fix some CI error (#18281)
Browse files Browse the repository at this point in the history
## What type of PR is this?

- [ ] API-change
- [ ] BUG
- [ ] Improvement
- [ ] Documentation
- [ ] Feature
- [ ] Test and CI
- [ ] Code Refactoring

## Which issue(s) this PR fixes:

issue #

## What this PR does / why we need it:
  • Loading branch information
ouyuanning authored Aug 22, 2024
1 parent 333fae7 commit d50f2a5
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 19 deletions.
4 changes: 3 additions & 1 deletion pkg/sql/colexec/evalExpressionReset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package colexec

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (fI *functionInformationForEval) reset() {
// get evalFn and freeFn from the function registry here.
if fI.evalFn != nil {
// we can set the context nil here since this function will never return an error.
overload, _ := function.GetFunctionById(nil, fI.overloadID)
overload, _ := function.GetFunctionById(context.TODO(), fI.overloadID)
fI.evalFn, fI.freeFn = overload.GetExecuteMethod()
}
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/sql/colexec/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func init() {

func TestFilter(t *testing.T) {
for _, tc := range tcs {
resetChildren(tc.arg)
resetChildren(tc.arg, tc.proc)
err := tc.arg.Prepare(tc.proc)
require.NoError(t, err)
// 1. First call
Expand All @@ -205,7 +205,7 @@ func TestFilter(t *testing.T) {

//--------------------------------------------------------
// Re enable the operator after reset
resetChildren(tc.arg)
resetChildren(tc.arg, tc.proc)
err = tc.arg.Prepare(tc.proc)
require.NoError(t, err)
res, _ = tc.arg.Call(tc.proc)
Expand All @@ -214,6 +214,10 @@ func TestFilter(t *testing.T) {
} else {
require.Equal(t, res.Batch == nil, true)
}
for _, child := range tc.arg.Children {
child.Reset(tc.proc, false, nil)
child.Free(tc.proc, false, nil)
}
tc.arg.Reset(tc.proc, false, nil)
tc.arg.Free(tc.proc, false, nil)

Expand All @@ -222,7 +226,11 @@ func TestFilter(t *testing.T) {
}
}

func resetChildren(arg *Filter) {
func resetChildren(arg *Filter, proc *process.Process) {
for _, child := range arg.Children {
child.Reset(proc, false, nil)
child.Free(proc, false, nil)
}
bat0 := MakeFilterMockBatchs()
bat1 := MakeFilterMockBatchs()
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat0, bat1})
Expand Down
15 changes: 10 additions & 5 deletions pkg/sql/colexec/filter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ func (filter *Filter) GetExeExpr() *plan.Expr {
}

func (filter *Filter) Reset(proc *process.Process, pipelineFailed bool, err error) {
for i := range filter.ctr.executors {
if filter.ctr.executors[i] != nil {
filter.ctr.executors[i].ResetForNextQuery()
}
}
filter.ctr.cleanExecutor() //todo need fix performance issue for executor mem reuse
filter.exeExpr = nil
}

Expand All @@ -102,3 +98,12 @@ func (ctr *container) cleanExecutor() {
}
ctr.executors = nil
}

// func (ctr *container) resetExecutor() {
// for i := range ctr.executors {
// if ctr.executors[i] != nil {
// ctr.executors[i].ResetForNextQuery()
// }
// }
// ctr.executors = nil
// }
2 changes: 2 additions & 0 deletions pkg/sql/colexec/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (merge *Merge) Call(proc *process.Process) (vm.CallResult, error) {
bat, err = msg.Batch.Dup(proc.GetMPool())
} else {
bat, err = merge.ctr.buf.AppendWithCopy(proc.Ctx, proc.GetMPool(), msg.Batch)
bat.ShuffleIDX = msg.Batch.ShuffleIDX
bat.Recursive = msg.Batch.Recursive
}
if err != nil {
proc.PutBatch(msg.Batch)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/merge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func TestMerge(t *testing.T) {
if ok.Status == vm.ExecStop || err != nil {
break
}
if ok.Batch != nil {
ok.Batch.Clean(tc.proc.GetMPool())
}
}
tc.arg.Reset(tc.proc, false, nil)
// for i := 0; i < len(tc.proc.Reg.MergeReceivers); i++ { // simulating the end of a pipeline
Expand All @@ -103,6 +106,9 @@ func TestMerge(t *testing.T) {
if ok.Status == vm.ExecStop || err != nil {
break
}
if ok.Batch != nil {
ok.Batch.Clean(tc.proc.GetMPool())
}
}
tc.arg.Free(tc.proc, false, nil)
tc.proc.Free()
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/colexec/mergegroup/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,4 @@ func (ctr *container) initEmptyBatchFromInput(bat *batch.Batch) {
for i := range bat.Vecs {
ctr.bat.Vecs[i] = vector.NewVec(*bat.Vecs[i].GetType())
}
return
}
10 changes: 5 additions & 5 deletions pkg/sql/colexec/table_function/table_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ func (tableFunction *TableFunction) Prepare(proc *process.Process) error {
return err
}

func (tf *TableFunction) createResultBatch() *batch.Batch {
bat := batch.NewWithSize(len(tf.Attrs))
bat.Attrs = tf.Attrs
for i := range tf.ctr.retSchema {
bat.Vecs[i] = vector.NewVec(tf.ctr.retSchema[i])
func (tableFunction *TableFunction) createResultBatch() *batch.Batch {
bat := batch.NewWithSize(len(tableFunction.Attrs))
bat.Attrs = tableFunction.Attrs
for i := range tableFunction.ctr.retSchema {
bat.Vecs[i] = vector.NewVec(tableFunction.ctr.retSchema[i])
}
return bat
}
11 changes: 7 additions & 4 deletions pkg/vm/engine/test/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,13 @@ func Test_ReaderCanReadCommittedInMemInsertAndDeletes(t *testing.T) {
disttaeEngine.Engine, 0)
require.NoError(t, err)

ret := batch.NewWithSize(len(schema.ColDefs))
for i, col := range schema.ColDefs {
vec := vector.NewVec(col.Type.Oid.ToType())
ret.Vecs[i] = vec
ret := batch.NewWithSize(1)
for _, col := range schema.ColDefs {
if col.Name == schema.ColDefs[primaryKeyIdx].Name {
vec := vector.NewVec(col.Type)
ret.Vecs[0] = vec
break
}
}
_, err = reader.Read(ctx, []string{schema.ColDefs[primaryKeyIdx].Name}, nil, mp, nil, ret)
require.NoError(t, err)
Expand Down

0 comments on commit d50f2a5

Please sign in to comment.