Skip to content

Commit

Permalink
[Feature] support sample function. (matrixorigin#12813)
Browse files Browse the repository at this point in the history
Support sample function which can be only used in SELECT_CLAUSE.
Usage:
```
select
sample(expression-list, N rows / K percent)
from table
[ where clause ]
[ group by clause ]
```
N means rows number.
K means row pick up probability.

This function will do sample from table data according to values of expression-list after where condition filter.  and return N rows for each sample column or just by probability.
If all values of expression-list are null, the row will not be pick up.

Other details can see the test_file 'sample_func.sql'.

There are two known problem:
1. select sample(*) was not implement.
2. There is a bug that we will return empty if we do sample from data source whose rows are all invalid to pick up. The correct behavior is we should return one invalid row.

Approved by: @iamlinjunhong, @heni02, @XuPeng-SH, @ouyuanning, @badboynt1, @nnsgmsone
  • Loading branch information
m-schen authored Nov 17, 2023
1 parent d9fa6aa commit 58b6314
Show file tree
Hide file tree
Showing 26 changed files with 10,312 additions and 7,305 deletions.
955 changes: 672 additions & 283 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

1,850 changes: 1,055 additions & 795 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

209 changes: 209 additions & 0 deletions pkg/sql/colexec/sample/sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sample

import (
"bytes"
"fmt"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

func (arg *Argument) String(buf *bytes.Buffer) {
switch arg.Type {
case mergeSampleByRow:
buf.WriteString(fmt.Sprintf("merge sample %d rows ", arg.Rows))
case sampleByRow:
buf.WriteString(fmt.Sprintf(" sample %d rows ", arg.Rows))
case sampleByPercent:
buf.WriteString(fmt.Sprintf(" sample %.2f percent ", arg.Percents))
default:
buf.WriteString("unknown sample type")
}
}

func (arg *Argument) Prepare(proc *process.Process) (err error) {
arg.ctr = &container{
isGroupBy: len(arg.GroupExprs) != 0,
isMultiSample: len(arg.SampleExprs) > 1,
tempBatch1: make([]*batch.Batch, 1),
sampleVectors: make([]*vector.Vector, len(arg.SampleExprs)),
}

switch arg.Type {
case sampleByRow:
arg.ctr.samplePool = newSamplePoolByRows(proc, arg.Rows, len(arg.SampleExprs), true)
case sampleByPercent:
arg.ctr.samplePool = newSamplePoolByPercent(proc, arg.Percents, len(arg.SampleExprs))
case mergeSampleByRow:
arg.ctr.samplePool = newSamplePoolByRows(proc, arg.Rows, len(arg.SampleExprs), false)
default:
return moerr.NewInternalErrorNoCtx(fmt.Sprintf("unknown sample type %d", arg.Type))
}

// sample column related.
arg.ctr.sampleExecutors, err = colexec.NewExpressionExecutorsFromPlanExpressions(proc, arg.SampleExprs)
if err != nil {
return err
}

// group by columns related.
arg.ctr.groupVectorsNullable = false
if arg.ctr.isGroupBy {
arg.ctr.groupExecutors = make([]colexec.ExpressionExecutor, len(arg.GroupExprs))
for i := range arg.GroupExprs {
arg.ctr.groupExecutors[i], err = colexec.NewExpressionExecutor(proc, arg.GroupExprs[i])
if err != nil {
return err
}
}
arg.ctr.groupVectors = make([]*vector.Vector, len(arg.GroupExprs))

keyWidth, groupKeyNullable := getGroupKeyWidth(arg.GroupExprs)
arg.ctr.useIntHashMap = keyWidth <= 8
arg.ctr.groupVectorsNullable = groupKeyNullable
}

return nil
}

func (arg *Argument) Call(proc *process.Process) (vm.CallResult, error) {
result, lastErr := arg.children[0].Call(proc)
if lastErr != nil {
return result, lastErr
}

if arg.buf != nil {
proc.PutBatch(arg.buf)
arg.buf = nil
}
arg.buf = result.Batch
bat := result.Batch

ctr := arg.ctr
if bat == nil {
result.Batch, lastErr = ctr.samplePool.Output(true)
return result, lastErr
}

var err error
if !bat.IsEmpty() {
if err = ctr.evaluateSampleAndGroupByColumns(proc, bat); err != nil {
return result, err
}

if ctr.isGroupBy {
err = ctr.hashAndSample(bat, arg.IBucket, arg.NBucket, proc.Mp())
} else {
err = ctr.samplePool.Sample(1, ctr.sampleVectors, nil, bat)
}
if err != nil {
return result, err
}
}

result.Batch, err = ctr.samplePool.Output(false)
return result, err
}

func getGroupKeyWidth(exprList []*plan.Expr) (keyWidth int, groupKeyNullable bool) {
keyWidth = 0
groupKeyNullable = false

for _, expr := range exprList {
width := types.T(expr.Typ.Id).TypeLen()
groupKeyNullable = groupKeyNullable || (!expr.Typ.NotNullable)
if types.T(expr.Typ.Id).FixedLength() < 0 {
width = 128
if expr.Typ.Width != 0 {
width = int(expr.Typ.Width)
}
}
if groupKeyNullable {
width++
}
keyWidth += width
}
return keyWidth, groupKeyNullable
}

func (ctr *container) evaluateSampleAndGroupByColumns(proc *process.Process, bat *batch.Batch) (err error) {
ctr.tempBatch1[0] = bat
// evaluate the sample columns.
for i, executor := range ctr.sampleExecutors {
ctr.sampleVectors[i], err = executor.Eval(proc, ctr.tempBatch1)
if err != nil {
return err
}
}

// evaluate the group by columns.
for i, executor := range ctr.groupExecutors {
ctr.groupVectors[i], err = executor.Eval(proc, ctr.tempBatch1)
if err != nil {
return err
}
}
return nil
}

func (ctr *container) hashAndSample(bat *batch.Batch, ib, nb int, mp *mpool.MPool) (err error) {
var iterator hashmap.Iterator
var groupList []uint64
count := bat.RowCount()

if ctr.useIntHashMap {
if ctr.intHashMap == nil {
ctr.intHashMap, err = hashmap.NewIntHashMap(ctr.groupVectorsNullable, uint64(ib), uint64(nb), mp)
if err != nil {
return err
}
}
iterator = ctr.intHashMap.NewIterator()
} else {
if ctr.strHashMap == nil {
ctr.strHashMap, err = hashmap.NewStrMap(ctr.groupVectorsNullable, uint64(ib), uint64(nb), mp)
if err != nil {
return err
}
}
iterator = ctr.strHashMap.NewIterator()
}

for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i
if n > hashmap.UnitLimit {
n = hashmap.UnitLimit
}

groupList, _, err = iterator.Insert(i, n, ctr.groupVectors)
if err != nil {
return err
}
err = ctr.samplePool.BatchSample(n, groupList, ctr.sampleVectors, ctr.groupVectors, bat)
if err != nil {
return err
}
}
return
}
Loading

0 comments on commit 58b6314

Please sign in to comment.