Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply pk filter for mo_tables and mo_database #17892

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions pkg/vm/engine/disttae/pk_filter_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,19 @@
package disttae

import (
"regexp"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

// TODO(ghs) workaround for special tables, remove later
// FIXME(ghs) remove this specialPattern later
var (
specialPattern *regexp.Regexp = regexp.MustCompile(`mo_tables|mo_database`)
)

func newBasePKFilter(
expr *plan.Expr,
tblDef *plan.TableDef,
proc *process.Process,
) (filter basePKFilter) {
if expr == nil || specialPattern.MatchString(tblDef.Name) {
if expr == nil {
return
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,13 @@ func (tbl *txnTable) PKPersistedBetween(
//for sorted block, we can use binary search to find the keys.
if filter == nil {
filter = buildFilter()
if filter == nil {
logutil.Warn("build filter failed, switch to linear search",
zap.Uint32("accid", tbl.accountId),
zap.Uint64("tableid", tbl.tableId),
zap.String("tablename", tbl.tableName))
filter = buildUnsortedFilter()
}
}
sels := filter(bat.Vecs)
if len(sels) > 0 {
Expand Down Expand Up @@ -2101,13 +2108,14 @@ func (tbl *txnTable) PrimaryKeysMayBeModified(
if !flushed {
return false, nil
}
//for mo_tables, mo_database, mo_columns, pk always exist in memory.
if tbl.tableName == catalog.MO_DATABASE ||
tbl.tableName == catalog.MO_TABLES ||
tbl.tableName == catalog.MO_COLUMNS {
logutil.Warnf("mo table:%s always exist in memory", tbl.tableName)
return true, nil
}

// if tbl.tableName == catalog.MO_DATABASE ||
// tbl.tableName == catalog.MO_TABLES ||
// tbl.tableName == catalog.MO_COLUMNS {
// logutil.Warnf("mo table:%s always exist in memory", tbl.tableName)
// return true, nil
// }

//need check pk whether exist on S3 block.
return tbl.PKPersistedBetween(
snap,
Expand Down
27 changes: 0 additions & 27 deletions pkg/vm/engine/disttae/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,33 +1528,6 @@ func ConstructObjStatsByLoadObjMeta(
return
}

// removeIf removes the elements that pred is true.
func removeIf[T any](data []T, pred func(t T) bool) []T {
if len(data) == 0 {
return data
}
res := 0
for i := 0; i < len(data); i++ {
if !pred(data[i]) {
if res != i {
data[res] = data[i]
}
res++
}
}
return data[:res]
}

func find[T ~string | ~int, S any](data map[T]S, val T) bool {
if len(data) == 0 {
return false
}
if _, exists := data[val]; exists {
return true
}
return false
}

// txnIsValid
// if the workspace is nil or txnOp is aborted, it returns error
func txnIsValid(txnOp client.TxnOperator) (*Transaction, error) {
Expand Down
23 changes: 0 additions & 23 deletions pkg/vm/engine/disttae/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -851,28 +850,6 @@ func TestEvalExprListToVec(t *testing.T) {
require.Zero(t, m.CurrNB())
}

func Test_removeIf(t *testing.T) {
strs := []string{"abc", "bc", "def"}

del1 := make(map[string]struct{})
del1["abc"] = struct{}{}
res1 := removeIf[string](strs, func(t string) bool {
return find[string](del1, t)
})
assert.Equal(t, []string{"bc", "def"}, res1)

del2 := make(map[string]struct{})
for _, str := range strs {
del2[str] = struct{}{}
}
res2 := removeIf[string](strs, func(t string) bool {
return find[string](del2, t)
})
assert.Equal(t, []string{}, res2)

assert.Equal(t, []string(nil), removeIf[string](nil, nil))
}

func Test_ConstructBasePKFilter(t *testing.T) {
m := mpool.MustNewNoFixed(t.Name())
proc := testutil.NewProcessWithMPool("", m)
Expand Down
27 changes: 26 additions & 1 deletion pkg/vm/engine/tae/blockio/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

func removeIf[T any](data []T, pred func(t T) bool) []T {
// from plan.RemoveIf
if len(data) == 0 {
return data
}
res := 0
for i := 0; i < len(data); i++ {
if !pred(data[i]) {
if res != i {
data[res] = data[i]
}
res++
}
}
return data[:res]
}

type ReadFilterSearchFuncType func([]*vector.Vector) []int64

type BlockReadFilter struct {
Expand All @@ -57,13 +74,21 @@ func ReadDataByFilter(
mp *mpool.MPool,
tableName string,
) (sels []int64, err error) {
bat, release, err := LoadColumns(ctx, columns, colTypes, fs, info.MetaLocation(), mp, fileservice.Policy(0))
bat, rowidIdx, deleteMask, release, err := readBlockData(ctx, columns, colTypes, info, ts, fs, mp, nil, fileservice.Policy(0))
if err != nil {
return
}
defer release()
if rowidIdx >= 0 {
panic("use rowid to filter, seriouslly?")
}

sels = searchFunc(bat.Vecs)
if !deleteMask.IsEmpty() {
sels = removeIf(sels, func(i int64) bool {
return deleteMask.Contains(uint64(i))
})
}
sels, err = ds.ApplyTombstones(ctx, info.BlockID, sels)
if err != nil {
return
Expand Down
75 changes: 75 additions & 0 deletions pkg/vm/engine/test/disttae_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"time"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
Expand Down Expand Up @@ -637,3 +639,76 @@ func TestCacheGC(t *testing.T) {
require.Equal(t, 2 /*test2 & test 4*/, r.TDelCpk)

}

func TestShowDatabasesInRestoreTxn(t *testing.T) {
opts := config.WithLongScanAndCKPOpts(nil)
p := testutil.InitEnginePack(testutil.TestOptions{TaeEngineOptions: opts}, t)
defer p.Close()
tae := p.T.GetDB()

schema := catalog2.MockSchemaAll(10, -1)
schema.Name = "test"
txnop := p.StartCNTxn()
p.CreateDBAndTable(txnop, "db", schema)
require.NoError(t, txnop.Commit(p.Ctx))

ts := time.Now().UTC().UnixNano()

time.Sleep(10 * time.Millisecond)

schema2 := catalog2.MockSchemaAll(10, -1)
schema2.Name = "test2"
txnop = p.StartCNTxn()
p.CreateDBAndTable(txnop, "db2", schema)
require.NoError(t, txnop.Commit(p.Ctx))

txn, _ := tae.StartTxn(nil)
catalogDB, _ := txn.GetDatabaseByID(catalog.MO_CATALOG_ID)
dbTbl, _ := catalogDB.GetRelationByID(catalog.MO_DATABASE_ID)

worker := ops.NewOpWorker(context.Background(), "xx")
worker.Start()
defer worker.Stop()

it := dbTbl.MakeObjectIt()
it.Next()
firstEntry := it.GetObject().GetMeta().(*catalog2.ObjectEntry)
task1, err := jobs.NewFlushTableTailTask(
tasks.WaitableCtx, txn,
[]*catalog2.ObjectEntry{firstEntry},
tae.Runtime, txn.GetStartTS())
require.NoError(t, err)
worker.SendOp(task1)
err = task1.WaitDone(context.Background())
require.NoError(t, err)

require.NoError(t, txn.Commit(p.Ctx))

txnop = p.StartCNTxn()
require.NoError(t, p.D.Engine.Delete(p.Ctx, "db2", txnop))
v, ok := runtime.ServiceRuntime("").GetGlobalVariables(runtime.InternalSQLExecutor)
if !ok {
panic(fmt.Sprintf("missing sql executor in service %q", ""))
}
exec := v.(executor.SQLExecutor)
res, err := exec.Exec(p.Ctx, fmt.Sprintf("show databases {MO_TS=%d}", ts), executor.Options{}.WithTxn(txnop))
require.NoError(t, err)
var rels []string
for _, b := range res.Batches {
for i, v := 0, b.Vecs[0]; i < v.Length(); i++ {
rels = append(rels, v.GetStringAt(i))
}
}
require.Equal(t, 2, len(rels), rels) // mo_catalog + db
require.NotContains(t, rels, "db2")
// res, err = exec.Exec(p.Ctx, "show databases", executor.Options{}.WithTxn(txnop))
// var brels []string
// for _, b := range res.Batches {
// for i, v := 0, b.Vecs[0]; i < v.Length(); i++ {
// brels = append(brels, v.GetStringAt(i))
// }
// }
// require.NoError(t, err)
// t.Log(rels, brels)

}
Loading