diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 6008797c55d15..cb9132d53b43b 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -42,7 +42,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" - "github.com/matrixorigin/matrixone/pkg/pb/lock" "github.com/matrixorigin/matrixone/pkg/pb/pipeline" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" @@ -126,6 +125,7 @@ func NewCompile( c.startAt = startAt c.disableRetry = false c.ncpu = system.GoMaxProcs() + c.lockMeta = NewLockMeta() if c.proc.GetTxnOperator() != nil { // TODO: The action of updating the WriteOffset logic should be executed in the `func (c *Compile) Run(_ uint64)` method. // However, considering that the delay ranges are not completed yet, the UpdateSnapshotWriteOffset() and @@ -178,6 +178,9 @@ func (c *Compile) Reset(proc *process.Process, startAt time.Time, fill func(*bat c.affectRows.Store(0) c.anal.Reset() + if c.lockMeta != nil { + c.lockMeta.reset(c.proc) + } for _, s := range c.scopes { s.Reset(c) } @@ -255,14 +258,16 @@ func (c *Compile) clear() { c.hasMergeOp = false c.needBlock = false + if c.lockMeta != nil { + c.lockMeta.clear(c.proc) + c.lockMeta = nil + } + for _, exe := range c.filterExprExes { exe.Free() } c.filterExprExes = nil - for k := range c.metaTables { - delete(c.metaTables, k) - } for k := range c.lockTables { delete(c.lockTables, k) } @@ -464,7 +469,7 @@ func (c *Compile) printPipeline() { // 2. init data source. func (c *Compile) prePipelineInitializer() (err error) { // do table lock. - if err = c.lockMetaTables(); err != nil { + if err = c.lockMeta.doLock(c.e, c.proc); err != nil { return err } if err = c.lockTable(); err != nil { @@ -691,53 +696,7 @@ func (c *Compile) appendMetaTables(objRes *plan.ObjectRef) { if !c.needLockMeta { return } - - if objRes.SchemaName == catalog.MO_CATALOG && (objRes.ObjName == catalog.MO_DATABASE || objRes.ObjName == catalog.MO_TABLES || objRes.ObjName == catalog.MO_COLUMNS) { - // do not lock meta table for meta table - } else { - key := fmt.Sprintf("%s %s", objRes.SchemaName, objRes.ObjName) - c.metaTables[key] = struct{}{} - } -} - -func (c *Compile) lockMetaTables() error { - lockLen := len(c.metaTables) - if lockLen == 0 { - return nil - } - - tables := make([]string, 0, lockLen) - for table := range c.metaTables { - tables = append(tables, table) - } - sort.Strings(tables) - - lockDbs := make(map[string]struct{}) - for _, table := range tables { - names := strings.SplitN(table, " ", 2) - - err := lockMoTable(c, names[0], names[1], lock.LockMode_Shared) - lockDbs[names[0]] = struct{}{} - if err != nil { - // if get error in locking mocatalog.mo_tables by it's dbName & tblName - // that means the origin table's schema was changed. then return NeedRetryWithDefChanged err - if moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) || - moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) { - return moerr.NewTxnNeedRetryWithDefChangedNoCtx() - } - - // other errors, just throw out - return err - } - } - for dbName := range lockDbs { - err := lockMoDatabase(c, dbName, lock.LockMode_Shared) - if err != nil { - return err - } - } - - return nil + c.lockMeta.appendMetaTables(objRes) } func (c *Compile) lockTable() error { diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 8431db1462132..ced4267b8cc81 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -3722,7 +3722,7 @@ func lockMoDatabase(c *Compile, dbName string, lockMode lock.LockMode) error { return err } defer vec.Free(c.proc.Mp()) - if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_ByRow, accountID); err != nil { + if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_None, accountID); err != nil { return err } return nil @@ -3744,7 +3744,7 @@ func lockMoTable( } defer vec.Free(c.proc.Mp()) - if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_ByRow, accountID); err != nil { + if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_None, accountID); err != nil { return err } return nil diff --git a/pkg/sql/compile/lock_meta.go b/pkg/sql/compile/lock_meta.go new file mode 100644 index 0000000000000..08293c3b15c68 --- /dev/null +++ b/pkg/sql/compile/lock_meta.go @@ -0,0 +1,261 @@ +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compile + +import ( + "fmt" + "sort" + "strings" + + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/pb/lock" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/lockop" + plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" + "github.com/matrixorigin/matrixone/pkg/sql/plan/function" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/process" +) + +type LockMeta struct { + database_table_id uint64 //table id of mo_database + table_table_id uint64 //table id of mo_tables + + metaTables map[string]struct{} //key: (db_name table_name) + lockDbExe colexec.ExpressionExecutor //executor to serial function to lock mo_database + lockTableExe colexec.ExpressionExecutor //executor to serial function to lock mo_tables + lockMetaVecs []*vector.Vector //paramters for serial function +} + +func NewLockMeta() *LockMeta { + return &LockMeta{} +} + +func (l *LockMeta) reset(_ *process.Process) { + if l.lockDbExe != nil { + l.lockDbExe.ResetForNextQuery() + } + if l.lockTableExe != nil { + l.lockTableExe.ResetForNextQuery() + } +} + +func (l *LockMeta) clear(proc *process.Process) { + for k := range l.metaTables { + delete(l.metaTables, k) + } + if l.lockDbExe != nil { + l.lockDbExe.Free() + l.lockDbExe = nil + } + if l.lockTableExe != nil { + l.lockTableExe.Free() + l.lockTableExe = nil + } + for _, vec := range l.lockMetaVecs { + vec.Free(proc.Mp()) + } + l.lockMetaVecs = nil +} + +func (l *LockMeta) appendMetaTables(objRes *plan.ObjectRef) { + if l.metaTables == nil { + l.metaTables = make(map[string]struct{}) + } + + if objRes.SchemaName == catalog.MO_CATALOG && (objRes.ObjName == catalog.MO_DATABASE || objRes.ObjName == catalog.MO_TABLES || objRes.ObjName == catalog.MO_COLUMNS) { + // do not lock meta table for meta table + } else { + key := fmt.Sprintf("%s %s", objRes.SchemaName, objRes.ObjName) + l.metaTables[key] = struct{}{} + } +} + +func (l *LockMeta) doLock(e engine.Engine, proc *process.Process) error { + lockLen := len(l.metaTables) + if lockLen == 0 { + return nil + } + + tables := make([]string, 0, lockLen) + for table := range l.metaTables { + tables = append(tables, table) + } + sort.Strings(tables) + + if err := l.initLockExe(e, proc); err != nil { + return err + } + + if len(l.lockMetaVecs) == 0 { + accountIdVec := vector.NewVec(types.T_uint32.ToType()) + dbName := vector.NewVec(types.T_varchar.ToType()) + tableName := vector.NewVec(types.T_varchar.ToType()) + l.lockMetaVecs = []*vector.Vector{ + accountIdVec, + dbName, + tableName, + } + } else { + for _, vec := range l.lockMetaVecs { + vec.CleanOnlyData() + } + } + + accountId := proc.GetSessionInfo().AccountId + lockDbs := make(map[string]struct{}) + bat := batch.NewWithSize(3) + for _, table := range tables { + names := strings.SplitN(table, " ", 2) + err := vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id + if err != nil { + return err + } + err = vector.AppendBytes(l.lockMetaVecs[1], []byte(names[0]), false, proc.GetMPool()) //db_name + if err != nil { + return err + } + err = vector.AppendBytes(l.lockMetaVecs[2], []byte(names[1]), false, proc.GetMPool()) //table_name + if err != nil { + return err + } + lockDbs[names[0]] = struct{}{} + } + + // call serial function to lock mo_tables + bat.Vecs = l.lockMetaVecs + err := l.lockMetaRows(e, proc, l.lockTableExe, bat, accountId, l.table_table_id) + if err != nil { + return err + } + + // recall serial function to lock mo_databases + l.lockMetaVecs[0].CleanOnlyData() + l.lockMetaVecs[1].CleanOnlyData() + if len(lockDbs) > 1 { + for dbName := range lockDbs { + err := vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id + if err != nil { + return err + } + err = vector.AppendBytes(l.lockMetaVecs[1], []byte(dbName), false, proc.GetMPool()) //db_name + if err != nil { + return err + } + } + } + bat.Vecs = l.lockMetaVecs[:2] + return l.lockMetaRows(e, proc, l.lockDbExe, bat, accountId, l.database_table_id) +} + +func (l *LockMeta) lockMetaRows(e engine.Engine, proc *process.Process, executor colexec.ExpressionExecutor, bat *batch.Batch, accountId uint32, tableId uint64) error { + executor.ResetForNextQuery() + bat.SetRowCount(l.lockMetaVecs[0].Length()) + lockVec, err := executor.Eval(proc, []*batch.Batch{bat}, nil) + if err != nil { + return err + } + if err := lockop.LockRows(e, proc, nil, tableId, lockVec, *lockVec.GetType(), lock.LockMode_Shared, lock.Sharding_None, accountId); err != nil { + // if get error in locking mocatalog.mo_tables by it's dbName & tblName + // that means the origin table's schema was changed. then return NeedRetryWithDefChanged err + if moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) || + moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) { + return moerr.NewTxnNeedRetryWithDefChangedNoCtx() + } + + // other errors, just throw out + return err + } + return nil +} + +func (l *LockMeta) initLockExe(e engine.Engine, proc *process.Process) error { + if l.lockTableExe != nil { + return nil + } + + accountTyp := types.T_uint32.ToType() + accountIdExpr := &plan.Expr{ + Typ: plan2.MakePlan2Type(&accountTyp), + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + ColPos: 0, + }, + }, + } + + dbNameTyp := types.T_varchar.ToType() + dbNameExpr := &plan.Expr{ + Typ: plan2.MakePlan2Type(&dbNameTyp), + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + ColPos: 1, + }, + }, + } + + tblNameTyp := types.T_varchar.ToType() + tblNameExpr := &plan.Expr{ + Typ: plan2.MakePlan2Type(&tblNameTyp), + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + ColPos: 2, + }, + }, + } + + lockDbExpr, err := plan2.BindFuncExprImplByPlanExpr(proc.Ctx, function.SerialFunctionName, []*plan.Expr{accountIdExpr, dbNameExpr}) + if err != nil { + return err + } + exec, err := colexec.NewExpressionExecutor(proc, lockDbExpr) + if err != nil { + return err + } + l.lockDbExe = exec + + lockTblxpr, err := plan2.BindFuncExprImplByPlanExpr(proc.Ctx, function.SerialFunctionName, []*plan.Expr{accountIdExpr, dbNameExpr, tblNameExpr}) + if err != nil { + return err + } + exec, err = colexec.NewExpressionExecutor(proc, lockTblxpr) + if err != nil { + return err + } + l.lockTableExe = exec + + dbSource, err := e.Database(proc.Ctx, catalog.MO_CATALOG, proc.GetTxnOperator()) + if err != nil { + return err + } + rel, err := dbSource.Relation(proc.Ctx, catalog.MO_DATABASE, nil) + if err != nil { + return err + } + l.database_table_id = rel.GetTableID(proc.Ctx) + + rel, err = dbSource.Relation(proc.Ctx, catalog.MO_TABLES, nil) + if err != nil { + return err + } + l.table_table_id = rel.GetTableID(proc.Ctx) + + return nil +} diff --git a/pkg/sql/compile/reuse.go b/pkg/sql/compile/reuse.go index 44b9c5f628bdf..3dc2ce95f00f8 100644 --- a/pkg/sql/compile/reuse.go +++ b/pkg/sql/compile/reuse.go @@ -35,7 +35,6 @@ func init() { counterSet: &perfcounter.CounterSet{}, nodeRegs: make(map[[2]int32]*process.WaitRegister), stepRegs: make(map[int32][][2]int32), - metaTables: make(map[string]struct{}), lockTables: make(map[uint64]*plan.LockTarget), MessageBoard: message.NewMessageBoard(), } diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index 260250429a762..d78cadc2809b4 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -285,7 +285,7 @@ type Compile struct { needLockMeta bool needBlock bool - metaTables map[string]struct{} + lockMeta *LockMeta lockTables map[uint64]*plan.LockTarget disableRetry bool