Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync to 1.2: adding in filter for the partition state rows scan #16077

Merged
merged 15 commits into from
May 14, 2024
196 changes: 138 additions & 58 deletions pkg/vm/engine/disttae/logtailreplay/rows_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package logtailreplay

import (
"bytes"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/tidwall/btree"
)
Expand Down Expand Up @@ -110,43 +109,153 @@ func (p *rowsIter) Close() error {
}

type primaryKeyIter struct {
ts types.TS
spec PrimaryKeyMatchSpec
iter btree.IterG[*PrimaryIndexEntry]
firstCalled bool
rows *btree.BTreeG[RowEntry]
curRow RowEntry
ts types.TS
spec PrimaryKeyMatchSpec
iter btree.IterG[*PrimaryIndexEntry]
rows *btree.BTreeG[RowEntry]
primaryIndex *btree.BTreeG[*PrimaryIndexEntry]
curRow RowEntry
}

type PrimaryKeyMatchSpec struct {
Seek []byte
Match func(key []byte) bool
// Move moves to the target
Move func(p *primaryKeyIter) bool
Name string
}

func Exact(key []byte) PrimaryKeyMatchSpec {
first := true
return PrimaryKeyMatchSpec{
Seek: key,
Match: func(k []byte) bool {
return bytes.Equal(k, key)
Name: "Exact",
Move: func(p *primaryKeyIter) bool {
var ok bool
if first {
first = false
ok = p.iter.Seek(&PrimaryIndexEntry{
Bytes: key,
})
} else {
ok = p.iter.Next()
}

if !ok {
return false
}

item := p.iter.Item()
return bytes.Equal(item.Bytes, key)
},
}
}

func Prefix(prefix []byte) PrimaryKeyMatchSpec {
first := true
return PrimaryKeyMatchSpec{
Seek: prefix,
Match: func(k []byte) bool {
return bytes.HasPrefix(k, prefix)
Name: "Prefix",
Move: func(p *primaryKeyIter) bool {
var ok bool
if first {
first = false
ok = p.iter.Seek(&PrimaryIndexEntry{
Bytes: prefix,
})
} else {
ok = p.iter.Next()
}

if !ok {
return false
}

item := p.iter.Item()
return bytes.HasPrefix(item.Bytes, prefix)
},
}
}

func MinMax(min []byte, max []byte) PrimaryKeyMatchSpec {
return PrimaryKeyMatchSpec{}
}

type phase int

const (
scan phase = 0
seek phase = 1
judge phase = 2
)

func ExactIn(encodes [][]byte) PrimaryKeyMatchSpec {
var encoded []byte

first := true
iterateAll := false

idx := 0
vecLen := len(encodes)
currentPhase := seek

updateEncoded := func() bool {
if idx >= vecLen {
return false
}
encoded = encodes[idx]
idx++
return true
}

match := func(key []byte) bool {
return bytes.Equal(key, encoded)
}

return PrimaryKeyMatchSpec{
Seek: min,
Match: func(k []byte) bool {
return bytes.Compare(min, k) <= 0 &&
bytes.Compare(k, max) <= 0
Name: "ExactIn",
Move: func(p *primaryKeyIter) bool {
if first {
first = false
// each seek may visit height items
// we choose to scan all if the seek is more expensive
if len(encodes)*p.primaryIndex.Height() > p.primaryIndex.Len() {
iterateAll = true
}
}

for {
switch currentPhase {
case judge:
if iterateAll {
if !updateEncoded() {
return false
}
currentPhase = scan
} else {
currentPhase = seek
}

case seek:
if !updateEncoded() {
// out of vec
return false
}
if !p.iter.Seek(&PrimaryIndexEntry{Bytes: encoded}) {
return false
}
if match(p.iter.Item().Bytes) {
currentPhase = scan
return true
}

case scan:
if !p.iter.Next() {
return false
}
if match(p.iter.Item().Bytes) {
return true
}
p.iter.Prev()
currentPhase = judge
}
}
},
}
}
Expand All @@ -155,40 +264,26 @@ func (p *PartitionState) NewPrimaryKeyIter(
ts types.TS,
spec PrimaryKeyMatchSpec,
) *primaryKeyIter {
iter := p.primaryIndex.Copy().Iter()
index := p.primaryIndex.Copy()
return &primaryKeyIter{
ts: ts,
spec: spec,
iter: iter,
rows: p.rows.Copy(),
ts: ts,
spec: spec,
iter: index.Iter(),
primaryIndex: index,
rows: p.rows.Copy(),
}
}

var _ RowsIter = new(primaryKeyIter)

func (p *primaryKeyIter) Next() bool {
for {

if !p.firstCalled {
if !p.iter.Seek(&PrimaryIndexEntry{
Bytes: p.spec.Seek,
}) {
return false
}
p.firstCalled = true
} else {
if !p.iter.Next() {
return false
}
if !p.spec.Move(p) {
return false
}

entry := p.iter.Item()

if !p.spec.Match(entry.Bytes) {
// no more
return false
}

// validate
valid := false
rowsIter := p.rows.Iter()
Expand Down Expand Up @@ -265,27 +360,12 @@ var _ RowsIter = new(primaryKeyDelIter)

func (p *primaryKeyDelIter) Next() bool {
for {

if !p.firstCalled {
if !p.iter.Seek(&PrimaryIndexEntry{
Bytes: p.spec.Seek,
}) {
return false
}
p.firstCalled = true
} else {
if !p.iter.Next() {
return false
}
if !p.spec.Move(&p.primaryKeyIter) {
return false
}

entry := p.iter.Item()

if !p.spec.Match(entry.Bytes) {
// no more
return false
}

if entry.BlockID.Compare(p.bid) != 0 {
continue
}
Expand Down
13 changes: 5 additions & 8 deletions pkg/vm/engine/disttae/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (mixin *withFilterMixin) reset() {
mixin.filterState.evaluated = false
mixin.filterState.filter = nil
mixin.columns.pkPos = -1
mixin.columns.rowidPos = -1
mixin.columns.indexOfFirstSortedColumn = -1
mixin.columns.seqnums = nil
mixin.columns.colTypes = nil
Expand Down Expand Up @@ -80,7 +79,6 @@ func (mixin *withFilterMixin) tryUpdateColumns(cols []string) {
mixin.columns.colTypes = make([]types.Type, len(cols))
// mixin.columns.colNulls = make([]bool, len(cols))
mixin.columns.pkPos = -1
mixin.columns.rowidPos = -1
mixin.columns.indexOfFirstSortedColumn = -1
compPKName2Pos := make(map[string]struct{})
positions := make(map[string]int)
Expand All @@ -92,7 +90,6 @@ func (mixin *withFilterMixin) tryUpdateColumns(cols []string) {
}
for i, column := range cols {
if column == catalog.Row_ID {
mixin.columns.rowidPos = i
mixin.columns.seqnums[i] = objectio.SEQNUM_ROWID
mixin.columns.colTypes[i] = objectio.RowidType
} else {
Expand Down Expand Up @@ -559,7 +556,7 @@ func (r *blockReader) gatherStats(lastNumRead, lastNumHit int64) {
func newBlockMergeReader(
ctx context.Context,
txnTable *txnTable,
encodedPrimaryKey []byte,
pkVal []byte,
ts timestamp.Timestamp,
dirtyBlks []*objectio.BlockInfo,
filterExpr *plan.Expr,
Expand All @@ -577,8 +574,8 @@ func newBlockMergeReader(
fs,
proc,
),
encodedPrimaryKey: encodedPrimaryKey,
deletaLocs: make(map[string][]objectio.Location),
pkVal: pkVal,
deletaLocs: make(map[string][]objectio.Location),
}
return r
}
Expand Down Expand Up @@ -662,10 +659,10 @@ func (r *blockMergeReader) loadDeletes(ctx context.Context, cols []string) error
}
ts := types.TimestampToTS(r.ts)

if filter != nil && info.Sorted && len(r.encodedPrimaryKey) > 0 {
if filter != nil && info.Sorted && len(r.pkVal) > 0 {
iter := state.NewPrimaryKeyDelIter(
ts,
logtailreplay.Prefix(r.encodedPrimaryKey),
logtailreplay.Prefix(r.pkVal),
info.BlockID,
)
for iter.Next() {
Expand Down
Loading
Loading