From 8404a1f1d48608d10a6ed56b94f84f3c8e355e95 Mon Sep 17 00:00:00 2001 From: ou yuanning <45346669+ouyuanning@users.noreply.github.com> Date: Tue, 22 Aug 2023 16:14:54 +0800 Subject: [PATCH] clean compile before put compile to pool (#11382) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 原来是在从pool get的时候clear。 会让调用方误以为Run这个函数使用之后,Compile对象还存在。但是实际上它是可能存在,也可能不存在。 因为当并发很高,其他地方快速从pool中get一个compile出来,然后进行clear的话。那么会导致前面那个调用方所拿到的compile中的数据是错误的(比如affect_rows,因为被另外一个调用方clear了) 这个重构,给Run方法增加了返回值。如果需要信息,调用方应该从返回值中拿。当Run结束后,应该认为compile对象内的所有信息已经被清空。 Approved by: @m-schen, @nnsgmsone, @zhangxu19830126, @daviszhen, @qingxinhome --- pkg/frontend/cmd_executor.go | 10 +++- pkg/frontend/computation_wrapper.go | 26 ++++++----- pkg/frontend/mysql_cmd_executor.go | 14 ++++-- pkg/frontend/mysql_cmd_executor_test.go | 8 ++-- pkg/frontend/routine_test.go | 7 +-- pkg/frontend/test/types_mock.go | 15 +++--- pkg/frontend/types.go | 6 ++- pkg/sql/compile/compile.go | 62 ++++++++++++++++--------- pkg/sql/compile/compile_test.go | 8 ++-- pkg/sql/compile/sql_executor.go | 7 ++- pkg/util/run_result.go | 19 ++++++++ 11 files changed, 122 insertions(+), 60 deletions(-) create mode 100644 pkg/util/run_result.go diff --git a/pkg/frontend/cmd_executor.go b/pkg/frontend/cmd_executor.go index d24a67a1a26ef..a534a73794ddd 100644 --- a/pkg/frontend/cmd_executor.go +++ b/pkg/frontend/cmd_executor.go @@ -20,6 +20,7 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + "github.com/matrixorigin/matrixone/pkg/util" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/frontend/constant" @@ -254,9 +255,14 @@ type baseStmtExecutor struct { ComputationWrapper tenantName string status stmtExecStatus + runResult *util.RunResult err error } +func (bse *baseStmtExecutor) GetAffectedRows() uint64 { + return bse.runResult.AffectRows +} + func (bse *baseStmtExecutor) GetStatus() stmtExecStatus { return bse.status } @@ -311,7 +317,9 @@ func (bse *baseStmtExecutor) CommitOrRollbackTxn(ctx context.Context, ses *Sessi } func (bse *baseStmtExecutor) ExecuteImpl(ctx context.Context, ses *Session) error { - return bse.Run(0) + runResult, err := bse.Run(0) + bse.runResult = runResult + return err } func (bse *baseStmtExecutor) Setup(ctx context.Context, ses *Session) error { diff --git a/pkg/frontend/computation_wrapper.go b/pkg/frontend/computation_wrapper.go index 3904409d204fd..45c2be644695d 100644 --- a/pkg/frontend/computation_wrapper.go +++ b/pkg/frontend/computation_wrapper.go @@ -35,6 +35,7 @@ import ( plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/util" "github.com/matrixorigin/matrixone/pkg/txn/clock" + util2 "github.com/matrixorigin/matrixone/pkg/util" "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace" "github.com/matrixorigin/matrixone/pkg/vm/engine/memoryengine" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -81,8 +82,8 @@ func (ncw *NullComputationWrapper) GetUUID() []byte { return ncw.uuid[:] } -func (ncw *NullComputationWrapper) Run(ts uint64) error { - return nil +func (ncw *NullComputationWrapper) Run(ts uint64) (*util2.RunResult, error) { + return nil, nil } func (ncw *NullComputationWrapper) GetLoadTag() bool { @@ -90,11 +91,12 @@ func (ncw *NullComputationWrapper) GetLoadTag() bool { } type TxnComputationWrapper struct { - stmt tree.Statement - plan *plan2.Plan - proc *process.Process - ses *Session - compile *compile.Compile + stmt tree.Statement + plan *plan2.Plan + proc *process.Process + ses *Session + compile *compile.Compile + runResult *util2.RunResult uuid uuid.UUID } @@ -185,7 +187,7 @@ func (cwft *TxnComputationWrapper) GetClock() clock.Clock { } func (cwft *TxnComputationWrapper) GetAffectedRows() uint64 { - return cwft.compile.GetAffectedRows() + return cwft.runResult.AffectRows } func (cwft *TxnComputationWrapper) GetServerStatus() uint16 { @@ -418,13 +420,15 @@ func (cwft *TxnComputationWrapper) GetUUID() []byte { return cwft.uuid[:] } -func (cwft *TxnComputationWrapper) Run(ts uint64) error { +func (cwft *TxnComputationWrapper) Run(ts uint64) (*util2.RunResult, error) { logDebug(cwft.ses, cwft.ses.GetDebugString(), "compile.Run begin") defer func() { logDebug(cwft.ses, cwft.ses.GetDebugString(), "compile.Run end") }() - err := cwft.compile.Run(ts) - return err + runResult, err := cwft.compile.Run(ts) + cwft.runResult = runResult + cwft.compile = nil + return runResult, err } func (cwft *TxnComputationWrapper) GetLoadTag() bool { diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index de996435e6c51..440398b16b80c 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -54,6 +54,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/plan/explain" + util2 "github.com/matrixorigin/matrixone/pkg/util" "github.com/matrixorigin/matrixone/pkg/util/metric" "github.com/matrixorigin/matrixone/pkg/util/trace" "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace" @@ -2508,6 +2509,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context, userName string, ) (retErr error) { var err error + var runResult *util2.RunResult var cmpBegin time.Time var ret interface{} var runner ComputationRunner @@ -3185,7 +3187,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context, } } // todo: add trace - if err = runner.Run(0); err != nil { + if _, err = runner.Run(0); err != nil { return err } @@ -3267,7 +3269,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context, } } - if err = runner.Run(0); err != nil { + if runResult, err = runner.Run(0); err != nil { if loadLocalErrGroup != nil { // release resources err2 = proc.LoadLocalReader.Close() if err2 != nil { @@ -3296,7 +3298,11 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context, logInfo(ses, "time of Exec.Run : %s", time.Since(runBegin).String()) } - rspLen = cw.GetAffectedRows() + if runResult == nil { + rspLen = 0 + } else { + rspLen = runResult.AffectRows + } echoTime := time.Now() logDebug(ses, "time of SendResponse %s", time.Since(echoTime).String()) @@ -3352,7 +3358,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context, /* Step 1: Start */ - if err = runner.Run(0); err != nil { + if _, err = runner.Run(0); err != nil { return err } diff --git a/pkg/frontend/mysql_cmd_executor_test.go b/pkg/frontend/mysql_cmd_executor_test.go index e471f73ad26bf..9c39befa544e4 100644 --- a/pkg/frontend/mysql_cmd_executor_test.go +++ b/pkg/frontend/mysql_cmd_executor_test.go @@ -111,7 +111,7 @@ func Test_mce(t *testing.T) { use_t.EXPECT().RecordExecPlan(ctx).Return(nil).AnyTimes() runner := mock_frontend.NewMockComputationRunner(ctrl) - runner.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() + runner.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes() create_1 := mock_frontend.NewMockComputationWrapper(ctrl) stmts, err = parsers.Parse(ctx, dialect.MYSQL, "create table A(a varchar(100),b int,c float)", 1) @@ -122,7 +122,7 @@ func Test_mce(t *testing.T) { create_1.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() create_1.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes() create_1.EXPECT().Compile(gomock.Any(), gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes() - create_1.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() + create_1.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes() create_1.EXPECT().GetLoadTag().Return(false).AnyTimes() create_1.EXPECT().GetAffectedRows().Return(uint64(0)).AnyTimes() create_1.EXPECT().RecordExecPlan(ctx).Return(nil).AnyTimes() @@ -136,7 +136,7 @@ func Test_mce(t *testing.T) { select_1.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() select_1.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes() select_1.EXPECT().Compile(gomock.Any(), gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes() - select_1.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() + select_1.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes() select_1.EXPECT().GetLoadTag().Return(false).AnyTimes() select_1.EXPECT().RecordExecPlan(ctx).Return(nil).AnyTimes() @@ -214,7 +214,7 @@ func Test_mce(t *testing.T) { select_2.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes() select_2.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes() select_2.EXPECT().Compile(gomock.Any(), gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes() - select_2.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes() + select_2.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes() select_2.EXPECT().GetLoadTag().Return(false).AnyTimes() select_2.EXPECT().GetAffectedRows().Return(uint64(0)).AnyTimes() select_2.EXPECT().GetColumns().Return(self_handle_sql_columns[i], nil).AnyTimes() diff --git a/pkg/frontend/routine_test.go b/pkg/frontend/routine_test.go index 3d4c5dc853291..be7adb7f63fa0 100644 --- a/pkg/frontend/routine_test.go +++ b/pkg/frontend/routine_test.go @@ -32,6 +32,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/parsers" "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + util "github.com/matrixorigin/matrixone/pkg/util" "github.com/matrixorigin/matrixone/pkg/util/metric" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -103,7 +104,7 @@ var newMockWrapper = func(ctrl *gomock.Controller, ses *Session, } uuid, _ := uuid.NewUUID() runner := mock_frontend.NewMockComputationRunner(ctrl) - runner.EXPECT().Run(gomock.Any()).DoAndReturn(func(uint64) error { + runner.EXPECT().Run(gomock.Any()).DoAndReturn(func(uint64) (*util.RunResult, error) { proto := ses.GetMysqlProtocol() if mrs != nil { if res.isSleepSql { @@ -117,10 +118,10 @@ var newMockWrapper = func(ctrl *gomock.Controller, ses *Session, err = proto.SendResultSetTextBatchRowSpeedup(mrs, mrs.GetRowCount()) if err != nil { logutil.Errorf("flush error %v", err) - return err + return nil, err } } - return nil + return &util.RunResult{AffectRows: 0}, nil }).AnyTimes() mcw := mock_frontend.NewMockComputationWrapper(ctrl) mcw.EXPECT().GetAst().Return(stmt).AnyTimes() diff --git a/pkg/frontend/test/types_mock.go b/pkg/frontend/test/types_mock.go index a77dedca9dc15..10cdd91562994 100644 --- a/pkg/frontend/test/types_mock.go +++ b/pkg/frontend/test/types_mock.go @@ -12,6 +12,7 @@ import ( batch "github.com/matrixorigin/matrixone/pkg/container/batch" types "github.com/matrixorigin/matrixone/pkg/container/types" tree "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + util "github.com/matrixorigin/matrixone/pkg/util" process "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -39,11 +40,12 @@ func (m *MockComputationRunner) EXPECT() *MockComputationRunnerMockRecorder { } // Run mocks base method. -func (m *MockComputationRunner) Run(ts uint64) error { +func (m *MockComputationRunner) Run(ts uint64) (*util.RunResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Run", ts) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*util.RunResult) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Run indicates an expected call of Run. @@ -204,11 +206,12 @@ func (mr *MockComputationWrapperMockRecorder) RecordExecPlan(ctx interface{}) *g } // Run mocks base method. -func (m *MockComputationWrapper) Run(ts uint64) error { +func (m *MockComputationWrapper) Run(ts uint64) (*util.RunResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Run", ts) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*util.RunResult) + ret1, _ := ret[1].(error) + return ret0, ret1 } // Run indicates an expected call of Run. diff --git a/pkg/frontend/types.go b/pkg/frontend/types.go index ba9459172e16e..9f674a22264ca 100644 --- a/pkg/frontend/types.go +++ b/pkg/frontend/types.go @@ -16,6 +16,7 @@ package frontend import ( "context" + "github.com/fagongzi/goetty/v2/buf" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/config" @@ -28,6 +29,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/txn/client" + "github.com/matrixorigin/matrixone/pkg/util" ) const ( @@ -41,7 +43,7 @@ type ( ) type ComputationRunner interface { - Run(ts uint64) (err error) + Run(ts uint64) (*util.RunResult, error) } // ComputationWrapper is the wrapper of the computation @@ -55,7 +57,7 @@ type ComputationWrapper interface { GetColumns() ([]interface{}, error) - GetAffectedRows() uint64 + // GetAffectedRows() uint64 Compile(requestCtx context.Context, u interface{}, fill func(interface{}, *batch.Batch) error) (interface{}, error) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index f0e9e8d4e4408..3fb22568c466a 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -62,6 +62,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/sql/util" + util2 "github.com/matrixorigin/matrixone/pkg/util" "github.com/matrixorigin/matrixone/pkg/util/executor" "github.com/matrixorigin/matrixone/pkg/util/trace" "github.com/matrixorigin/matrixone/pkg/vm" @@ -97,7 +98,6 @@ var analPool = sync.Pool{ func New(addr, db string, sql string, tenant, uid string, ctx context.Context, e engine.Engine, proc *process.Process, stmt tree.Statement, isInternal bool, cnLabel map[string]string) *Compile { c := pool.Get().(*Compile) - c.clear() c.e = e c.db = db c.ctx = ctx @@ -115,6 +115,22 @@ func New(addr, db string, sql string, tenant, uid string, ctx context.Context, return c } +func putCompile(c *Compile) { + if c == nil { + return + } + if c.anal != nil { + for i := range c.anal.analInfos { + analPool.Put(c.anal.analInfos[i]) + } + c.anal.analInfos = nil + } + + c.proc.CleanValueScanBatchs() + c.clear() + pool.Put(c) +} + func (c *Compile) clear() { c.scope = c.scope[:0] c.pn = nil @@ -225,7 +241,7 @@ func (c *Compile) setAffectedRows(n uint64) { c.affectRows.Store(n) } -func (c *Compile) GetAffectedRows() uint64 { +func (c *Compile) getAffectedRows() uint64 { affectRows := c.affectRows.Load() return affectRows } @@ -327,20 +343,17 @@ func (c *Compile) run(s *Scope) error { } // Run is an important function of the compute-layer, it executes a single sql according to its scope -func (c *Compile) Run(_ uint64) error { +func (c *Compile) Run(_ uint64) (*util2.RunResult, error) { + var cc *Compile _, task := gotrace.NewTask(context.TODO(), "pipeline.Run") defer task.End() defer func() { - if c.anal != nil { - for i := range c.anal.analInfos { - analPool.Put(c.anal.analInfos[i]) - } - c.anal.analInfos = nil - } - - c.proc.CleanValueScanBatchs() - pool.Put(c) + putCompile(c) + putCompile(cc) }() + result := &util2.RunResult{ + AffectRows: 0, + } if c.proc.TxnOperator != nil { c.proc.TxnOperator.GetWorkspace().IncrSQLCount() c.proc.TxnOperator.ResetRetry(false) @@ -357,16 +370,16 @@ func (c *Compile) Run(_ uint64) error { // clear the workspace of the failed statement if e := c.proc.TxnOperator.GetWorkspace().RollbackLastStatement(c.ctx); e != nil { - return e + return nil, e } // increase the statement id if e := c.proc.TxnOperator.GetWorkspace().IncrStatementID(c.ctx, false); e != nil { - return e + return nil, e } // FIXME: the current retry method is quite bad, the overhead is relatively large, and needs to be // improved to refresh expression in the future. - cc := New( + cc = New( c.addr, c.db, c.sql, @@ -381,28 +394,31 @@ func (c *Compile) Run(_ uint64) error { if moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) { pn, err := c.buildPlanFunc() if err != nil { - return err + return nil, err } c.pn = pn } if err := cc.Compile(c.proc.Ctx, c.pn, c.u, c.fill); err != nil { c.fatalLog(1, err) - return err + return nil, err } if err := cc.runOnce(); err != nil { c.fatalLog(1, err) - return err + return nil, err } // set affectedRows to old compile to return - c.setAffectedRows(cc.GetAffectedRows()) - return c.proc.TxnOperator.GetWorkspace().Adjust() + c.setAffectedRows(cc.getAffectedRows()) + result.AffectRows = cc.getAffectedRows() + return result, c.proc.TxnOperator.GetWorkspace().Adjust() } - return err + return nil, err } + + result.AffectRows = c.getAffectedRows() if c.proc.TxnOperator != nil { - return c.proc.TxnOperator.GetWorkspace().Adjust() + return result, c.proc.TxnOperator.GetWorkspace().Adjust() } - return nil + return result, nil } // run once diff --git a/pkg/sql/compile/compile_test.go b/pkg/sql/compile/compile_test.go index 5846ea8a2f910..c37d8b498aab7 100644 --- a/pkg/sql/compile/compile_test.go +++ b/pkg/sql/compile/compile_test.go @@ -125,8 +125,8 @@ func TestCompile(t *testing.T) { c := New("test", "test", tc.sql, "", "", context.TODO(), tc.e, tc.proc, tc.stmt, false, nil) err := c.Compile(ctx, tc.pn, nil, testPrint) require.NoError(t, err) - c.GetAffectedRows() - err = c.Run(0) + c.getAffectedRows() + _, err = c.Run(0) require.NoError(t, err) // Enable memory check tc.proc.FreeVectors() @@ -144,8 +144,8 @@ func TestCompileWithFaults(t *testing.T) { c := New("test", "test", tc.sql, "", "", context.TODO(), tc.e, tc.proc, nil, false, nil) err := c.Compile(ctx, tc.pn, nil, testPrint) require.NoError(t, err) - c.GetAffectedRows() - err = c.Run(0) + c.getAffectedRows() + _, err = c.Run(0) require.NoError(t, err) } diff --git a/pkg/sql/compile/sql_executor.go b/pkg/sql/compile/sql_executor.go index a4e09c8668857..91a2a4721e254 100644 --- a/pkg/sql/compile/sql_executor.go +++ b/pkg/sql/compile/sql_executor.go @@ -30,6 +30,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/txn/client" + "github.com/matrixorigin/matrixone/pkg/util" "github.com/matrixorigin/matrixone/pkg/util/executor" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -249,12 +250,14 @@ func (exec *txnExecutor) Exec(sql string) (executor.Result, error) { if err != nil { return executor.Result{}, err } - if err := c.Run(0); err != nil { + var runResult *util.RunResult + runResult, err = c.Run(0) + if err != nil { return executor.Result{}, err } result.Batches = batches - result.AffectedRows = c.GetAffectedRows() + result.AffectedRows = runResult.AffectRows return result, nil } diff --git a/pkg/util/run_result.go b/pkg/util/run_result.go new file mode 100644 index 0000000000000..272fba8f014e1 --- /dev/null +++ b/pkg/util/run_result.go @@ -0,0 +1,19 @@ +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +type RunResult struct { + AffectRows uint64 +}