Skip to content

Commit

Permalink
improve lock meta table for DML (#20763)
Browse files Browse the repository at this point in the history
improve lock meta table for DML
1、复用上meta锁的vector
2、去掉上锁时候每次都获取relation的逻辑,提前拿table id
3、当要上多个表的meta锁的时候,合并到一个vector

Approved by: @zhangxu19830126, @badboynt1
  • Loading branch information
ouyuanning authored Dec 16, 2024
1 parent d923d72 commit fa32fcb
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 56 deletions.
63 changes: 11 additions & 52 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
Expand Down Expand Up @@ -139,6 +138,7 @@ func NewCompile(
c.startAt = startAt
c.disableRetry = false
c.ncpu = system.GoMaxProcs()
c.lockMeta = NewLockMeta()
if c.proc.GetTxnOperator() != nil {
// TODO: The action of updating the WriteOffset logic should be executed in the `func (c *Compile) Run(_ uint64)` method.
// However, considering that the delay ranges are not completed yet, the UpdateSnapshotWriteOffset() and
Expand Down Expand Up @@ -191,6 +191,9 @@ func (c *Compile) Reset(proc *process.Process, startAt time.Time, fill func(*bat
c.affectRows.Store(0)
c.anal.Reset()

if c.lockMeta != nil {
c.lockMeta.reset(c.proc)
}
for _, s := range c.scopes {
s.Reset(c)
}
Expand Down Expand Up @@ -268,14 +271,16 @@ func (c *Compile) clear() {
c.hasMergeOp = false
c.needBlock = false

if c.lockMeta != nil {
c.lockMeta.clear(c.proc)
c.lockMeta = nil
}

for _, exe := range c.filterExprExes {
exe.Free()
}
c.filterExprExes = nil

for k := range c.metaTables {
delete(c.metaTables, k)
}
for k := range c.lockTables {
delete(c.lockTables, k)
}
Expand Down Expand Up @@ -477,7 +482,7 @@ func (c *Compile) printPipeline() {
// 2. init data source.
func (c *Compile) prePipelineInitializer() (err error) {
// do table lock.
if err = c.lockMetaTables(); err != nil {
if err = c.lockMeta.doLock(c.e, c.proc); err != nil {
return err
}
if err = c.lockTable(); err != nil {
Expand Down Expand Up @@ -716,53 +721,7 @@ func (c *Compile) appendMetaTables(objRes *plan.ObjectRef) {
if !c.needLockMeta {
return
}

if objRes.SchemaName == catalog.MO_CATALOG && (objRes.ObjName == catalog.MO_DATABASE || objRes.ObjName == catalog.MO_TABLES || objRes.ObjName == catalog.MO_COLUMNS) {
// do not lock meta table for meta table
} else {
key := fmt.Sprintf("%s %s", objRes.SchemaName, objRes.ObjName)
c.metaTables[key] = struct{}{}
}
}

func (c *Compile) lockMetaTables() error {
lockLen := len(c.metaTables)
if lockLen == 0 {
return nil
}

tables := make([]string, 0, lockLen)
for table := range c.metaTables {
tables = append(tables, table)
}
sort.Strings(tables)

lockDbs := make(map[string]struct{})
for _, table := range tables {
names := strings.SplitN(table, " ", 2)

err := lockMoTable(c, names[0], names[1], lock.LockMode_Shared)
lockDbs[names[0]] = struct{}{}
if err != nil {
// if get error in locking mocatalog.mo_tables by it's dbName & tblName
// that means the origin table's schema was changed. then return NeedRetryWithDefChanged err
if moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) ||
moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return moerr.NewTxnNeedRetryWithDefChangedNoCtx()
}

// other errors, just throw out
return err
}
}
for dbName := range lockDbs {
err := lockMoDatabase(c, dbName, lock.LockMode_Shared)
if err != nil {
return err
}
}

return nil
c.lockMeta.appendMetaTables(objRes)
}

func (c *Compile) lockTable() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3720,7 +3720,7 @@ func lockMoDatabase(c *Compile, dbName string, lockMode lock.LockMode) error {
return err
}
defer vec.Free(c.proc.Mp())
if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_ByRow, accountID); err != nil {
if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_None, accountID); err != nil {
return err
}
return nil
Expand All @@ -3742,7 +3742,7 @@ func lockMoTable(
}
defer vec.Free(c.proc.Mp())

if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_ByRow, accountID); err != nil {
if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_None, accountID); err != nil {
return err
}
return nil
Expand Down
261 changes: 261 additions & 0 deletions pkg/sql/compile/lock_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Copyright 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compile

import (
"fmt"
"sort"
"strings"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/lockop"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

type LockMeta struct {
database_table_id uint64 //table id of mo_database
table_table_id uint64 //table id of mo_tables

metaTables map[string]struct{} //key: (db_name table_name)
lockDbExe colexec.ExpressionExecutor //executor to serial function to lock mo_database
lockTableExe colexec.ExpressionExecutor //executor to serial function to lock mo_tables
lockMetaVecs []*vector.Vector //paramters for serial function
}

func NewLockMeta() *LockMeta {
return &LockMeta{}
}

func (l *LockMeta) reset(_ *process.Process) {
if l.lockDbExe != nil {
l.lockDbExe.ResetForNextQuery()
}
if l.lockTableExe != nil {
l.lockTableExe.ResetForNextQuery()
}
}

func (l *LockMeta) clear(proc *process.Process) {
for k := range l.metaTables {
delete(l.metaTables, k)
}
if l.lockDbExe != nil {
l.lockDbExe.Free()
l.lockDbExe = nil
}
if l.lockTableExe != nil {
l.lockTableExe.Free()
l.lockTableExe = nil
}
for _, vec := range l.lockMetaVecs {
vec.Free(proc.Mp())
}
l.lockMetaVecs = nil
}

func (l *LockMeta) appendMetaTables(objRes *plan.ObjectRef) {
if l.metaTables == nil {
l.metaTables = make(map[string]struct{})
}

if objRes.SchemaName == catalog.MO_CATALOG && (objRes.ObjName == catalog.MO_DATABASE || objRes.ObjName == catalog.MO_TABLES || objRes.ObjName == catalog.MO_COLUMNS) {
// do not lock meta table for meta table
} else {
key := fmt.Sprintf("%s %s", objRes.SchemaName, objRes.ObjName)
l.metaTables[key] = struct{}{}
}
}

func (l *LockMeta) doLock(e engine.Engine, proc *process.Process) error {
lockLen := len(l.metaTables)
if lockLen == 0 {
return nil
}

tables := make([]string, 0, lockLen)
for table := range l.metaTables {
tables = append(tables, table)
}
sort.Strings(tables)

if err := l.initLockExe(e, proc); err != nil {
return err
}

if len(l.lockMetaVecs) == 0 {
accountIdVec := vector.NewVec(types.T_uint32.ToType())
dbName := vector.NewVec(types.T_varchar.ToType())
tableName := vector.NewVec(types.T_varchar.ToType())
l.lockMetaVecs = []*vector.Vector{
accountIdVec,
dbName,
tableName,
}
} else {
for _, vec := range l.lockMetaVecs {
vec.CleanOnlyData()
}
}

accountId := proc.GetSessionInfo().AccountId
lockDbs := make(map[string]struct{})
bat := batch.NewWithSize(3)
for _, table := range tables {
names := strings.SplitN(table, " ", 2)
err := vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[1], []byte(names[0]), false, proc.GetMPool()) //db_name
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[2], []byte(names[1]), false, proc.GetMPool()) //table_name
if err != nil {
return err
}
lockDbs[names[0]] = struct{}{}
}

// call serial function to lock mo_tables
bat.Vecs = l.lockMetaVecs
err := l.lockMetaRows(e, proc, l.lockTableExe, bat, accountId, l.table_table_id)
if err != nil {
return err
}

// recall serial function to lock mo_databases
l.lockMetaVecs[0].CleanOnlyData()
l.lockMetaVecs[1].CleanOnlyData()
if len(lockDbs) > 1 {
for dbName := range lockDbs {
err := vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[1], []byte(dbName), false, proc.GetMPool()) //db_name
if err != nil {
return err
}
}
}
bat.Vecs = l.lockMetaVecs[:2]
return l.lockMetaRows(e, proc, l.lockDbExe, bat, accountId, l.database_table_id)
}

func (l *LockMeta) lockMetaRows(e engine.Engine, proc *process.Process, executor colexec.ExpressionExecutor, bat *batch.Batch, accountId uint32, tableId uint64) error {
executor.ResetForNextQuery()
bat.SetRowCount(l.lockMetaVecs[0].Length())
lockVec, err := executor.Eval(proc, []*batch.Batch{bat}, nil)
if err != nil {
return err
}
if err := lockop.LockRows(e, proc, nil, tableId, lockVec, *lockVec.GetType(), lock.LockMode_Shared, lock.Sharding_None, accountId); err != nil {
// if get error in locking mocatalog.mo_tables by it's dbName & tblName
// that means the origin table's schema was changed. then return NeedRetryWithDefChanged err
if moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) ||
moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return moerr.NewTxnNeedRetryWithDefChangedNoCtx()
}

// other errors, just throw out
return err
}
return nil
}

func (l *LockMeta) initLockExe(e engine.Engine, proc *process.Process) error {
if l.lockTableExe != nil {
return nil
}

accountTyp := types.T_uint32.ToType()
accountIdExpr := &plan.Expr{
Typ: plan2.MakePlan2Type(&accountTyp),
Expr: &plan.Expr_Col{
Col: &plan.ColRef{
ColPos: 0,
},
},
}

dbNameTyp := types.T_varchar.ToType()
dbNameExpr := &plan.Expr{
Typ: plan2.MakePlan2Type(&dbNameTyp),
Expr: &plan.Expr_Col{
Col: &plan.ColRef{
ColPos: 1,
},
},
}

tblNameTyp := types.T_varchar.ToType()
tblNameExpr := &plan.Expr{
Typ: plan2.MakePlan2Type(&tblNameTyp),
Expr: &plan.Expr_Col{
Col: &plan.ColRef{
ColPos: 2,
},
},
}

lockDbExpr, err := plan2.BindFuncExprImplByPlanExpr(proc.Ctx, function.SerialFunctionName, []*plan.Expr{accountIdExpr, dbNameExpr})
if err != nil {
return err
}
exec, err := colexec.NewExpressionExecutor(proc, lockDbExpr)
if err != nil {
return err
}
l.lockDbExe = exec

lockTblxpr, err := plan2.BindFuncExprImplByPlanExpr(proc.Ctx, function.SerialFunctionName, []*plan.Expr{accountIdExpr, dbNameExpr, tblNameExpr})
if err != nil {
return err
}
exec, err = colexec.NewExpressionExecutor(proc, lockTblxpr)
if err != nil {
return err
}
l.lockTableExe = exec

dbSource, err := e.Database(proc.Ctx, catalog.MO_CATALOG, proc.GetTxnOperator())
if err != nil {
return err
}
rel, err := dbSource.Relation(proc.Ctx, catalog.MO_DATABASE, nil)
if err != nil {
return err
}
l.database_table_id = rel.GetTableID(proc.Ctx)

rel, err = dbSource.Relation(proc.Ctx, catalog.MO_TABLES, nil)
if err != nil {
return err
}
l.table_table_id = rel.GetTableID(proc.Ctx)

return nil
}
1 change: 0 additions & 1 deletion pkg/sql/compile/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func init() {
counterSet: &perfcounter.CounterSet{},
nodeRegs: make(map[[2]int32]*process.WaitRegister),
stepRegs: make(map[int32][][2]int32),
metaTables: make(map[string]struct{}),
lockTables: make(map[uint64]*plan.LockTarget),
MessageBoard: message.NewMessageBoard(),
}
Expand Down
Loading

0 comments on commit fa32fcb

Please sign in to comment.