Skip to content

Commit

Permalink
clean compile before put compile to pool (#11382)
Browse files Browse the repository at this point in the history
原来是在从pool get的时候clear。
会让调用方误以为Run这个函数使用之后,Compile对象还存在。但是实际上它是可能存在,也可能不存在。
因为当并发很高,其他地方快速从pool中get一个compile出来,然后进行clear的话。那么会导致前面那个调用方所拿到的compile中的数据是错误的(比如affect_rows,因为被另外一个调用方clear了)

这个重构,给Run方法增加了返回值。如果需要信息,调用方应该从返回值中拿。当Run结束后,应该认为compile对象内的所有信息已经被清空。

Approved by: @m-schen, @nnsgmsone, @zhangxu19830126, @daviszhen, @qingxinhome
  • Loading branch information
ouyuanning authored Aug 22, 2023
1 parent 4d0893f commit 8404a1f
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 60 deletions.
10 changes: 9 additions & 1 deletion pkg/frontend/cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/util"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/frontend/constant"
Expand Down Expand Up @@ -254,9 +255,14 @@ type baseStmtExecutor struct {
ComputationWrapper
tenantName string
status stmtExecStatus
runResult *util.RunResult
err error
}

func (bse *baseStmtExecutor) GetAffectedRows() uint64 {
return bse.runResult.AffectRows
}

func (bse *baseStmtExecutor) GetStatus() stmtExecStatus {
return bse.status
}
Expand Down Expand Up @@ -311,7 +317,9 @@ func (bse *baseStmtExecutor) CommitOrRollbackTxn(ctx context.Context, ses *Sessi
}

func (bse *baseStmtExecutor) ExecuteImpl(ctx context.Context, ses *Session) error {
return bse.Run(0)
runResult, err := bse.Run(0)
bse.runResult = runResult
return err
}

func (bse *baseStmtExecutor) Setup(ctx context.Context, ses *Session) error {
Expand Down
26 changes: 15 additions & 11 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/sql/util"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
util2 "github.com/matrixorigin/matrixone/pkg/util"
"github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace"
"github.com/matrixorigin/matrixone/pkg/vm/engine/memoryengine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
Expand Down Expand Up @@ -81,20 +82,21 @@ func (ncw *NullComputationWrapper) GetUUID() []byte {
return ncw.uuid[:]
}

func (ncw *NullComputationWrapper) Run(ts uint64) error {
return nil
func (ncw *NullComputationWrapper) Run(ts uint64) (*util2.RunResult, error) {
return nil, nil
}

func (ncw *NullComputationWrapper) GetLoadTag() bool {
return false
}

type TxnComputationWrapper struct {
stmt tree.Statement
plan *plan2.Plan
proc *process.Process
ses *Session
compile *compile.Compile
stmt tree.Statement
plan *plan2.Plan
proc *process.Process
ses *Session
compile *compile.Compile
runResult *util2.RunResult

uuid uuid.UUID
}
Expand Down Expand Up @@ -185,7 +187,7 @@ func (cwft *TxnComputationWrapper) GetClock() clock.Clock {
}

func (cwft *TxnComputationWrapper) GetAffectedRows() uint64 {
return cwft.compile.GetAffectedRows()
return cwft.runResult.AffectRows
}

func (cwft *TxnComputationWrapper) GetServerStatus() uint16 {
Expand Down Expand Up @@ -418,13 +420,15 @@ func (cwft *TxnComputationWrapper) GetUUID() []byte {
return cwft.uuid[:]
}

func (cwft *TxnComputationWrapper) Run(ts uint64) error {
func (cwft *TxnComputationWrapper) Run(ts uint64) (*util2.RunResult, error) {
logDebug(cwft.ses, cwft.ses.GetDebugString(), "compile.Run begin")
defer func() {
logDebug(cwft.ses, cwft.ses.GetDebugString(), "compile.Run end")
}()
err := cwft.compile.Run(ts)
return err
runResult, err := cwft.compile.Run(ts)
cwft.runResult = runResult
cwft.compile = nil
return runResult, err
}

func (cwft *TxnComputationWrapper) GetLoadTag() bool {
Expand Down
14 changes: 10 additions & 4 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/sql/plan/explain"
util2 "github.com/matrixorigin/matrixone/pkg/util"
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace"
Expand Down Expand Up @@ -2508,6 +2509,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
userName string,
) (retErr error) {
var err error
var runResult *util2.RunResult
var cmpBegin time.Time
var ret interface{}
var runner ComputationRunner
Expand Down Expand Up @@ -3185,7 +3187,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
}
}
// todo: add trace
if err = runner.Run(0); err != nil {
if _, err = runner.Run(0); err != nil {
return err
}

Expand Down Expand Up @@ -3267,7 +3269,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
}
}

if err = runner.Run(0); err != nil {
if runResult, err = runner.Run(0); err != nil {
if loadLocalErrGroup != nil { // release resources
err2 = proc.LoadLocalReader.Close()
if err2 != nil {
Expand Down Expand Up @@ -3296,7 +3298,11 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
logInfo(ses, "time of Exec.Run : %s", time.Since(runBegin).String())
}

rspLen = cw.GetAffectedRows()
if runResult == nil {
rspLen = 0
} else {
rspLen = runResult.AffectRows
}
echoTime := time.Now()

logDebug(ses, "time of SendResponse %s", time.Since(echoTime).String())
Expand Down Expand Up @@ -3352,7 +3358,7 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
/*
Step 1: Start
*/
if err = runner.Run(0); err != nil {
if _, err = runner.Run(0); err != nil {
return err
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test_mce(t *testing.T) {
use_t.EXPECT().RecordExecPlan(ctx).Return(nil).AnyTimes()

runner := mock_frontend.NewMockComputationRunner(ctrl)
runner.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
runner.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes()

create_1 := mock_frontend.NewMockComputationWrapper(ctrl)
stmts, err = parsers.Parse(ctx, dialect.MYSQL, "create table A(a varchar(100),b int,c float)", 1)
Expand All @@ -122,7 +122,7 @@ func Test_mce(t *testing.T) {
create_1.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes()
create_1.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes()
create_1.EXPECT().Compile(gomock.Any(), gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes()
create_1.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
create_1.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes()
create_1.EXPECT().GetLoadTag().Return(false).AnyTimes()
create_1.EXPECT().GetAffectedRows().Return(uint64(0)).AnyTimes()
create_1.EXPECT().RecordExecPlan(ctx).Return(nil).AnyTimes()
Expand All @@ -136,7 +136,7 @@ func Test_mce(t *testing.T) {
select_1.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes()
select_1.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes()
select_1.EXPECT().Compile(gomock.Any(), gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes()
select_1.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
select_1.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes()
select_1.EXPECT().GetLoadTag().Return(false).AnyTimes()
select_1.EXPECT().RecordExecPlan(ctx).Return(nil).AnyTimes()

Expand Down Expand Up @@ -214,7 +214,7 @@ func Test_mce(t *testing.T) {
select_2.EXPECT().GetUUID().Return(make([]byte, 16)).AnyTimes()
select_2.EXPECT().SetDatabaseName(gomock.Any()).Return(nil).AnyTimes()
select_2.EXPECT().Compile(gomock.Any(), gomock.Any(), gomock.Any()).Return(runner, nil).AnyTimes()
select_2.EXPECT().Run(gomock.Any()).Return(nil).AnyTimes()
select_2.EXPECT().Run(gomock.Any()).Return(nil, nil).AnyTimes()
select_2.EXPECT().GetLoadTag().Return(false).AnyTimes()
select_2.EXPECT().GetAffectedRows().Return(uint64(0)).AnyTimes()
select_2.EXPECT().GetColumns().Return(self_handle_sql_columns[i], nil).AnyTimes()
Expand Down
7 changes: 4 additions & 3 deletions pkg/frontend/routine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
util "github.com/matrixorigin/matrixone/pkg/util"
"github.com/matrixorigin/matrixone/pkg/util/metric"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
Expand Down Expand Up @@ -103,7 +104,7 @@ var newMockWrapper = func(ctrl *gomock.Controller, ses *Session,
}
uuid, _ := uuid.NewUUID()
runner := mock_frontend.NewMockComputationRunner(ctrl)
runner.EXPECT().Run(gomock.Any()).DoAndReturn(func(uint64) error {
runner.EXPECT().Run(gomock.Any()).DoAndReturn(func(uint64) (*util.RunResult, error) {
proto := ses.GetMysqlProtocol()
if mrs != nil {
if res.isSleepSql {
Expand All @@ -117,10 +118,10 @@ var newMockWrapper = func(ctrl *gomock.Controller, ses *Session,
err = proto.SendResultSetTextBatchRowSpeedup(mrs, mrs.GetRowCount())
if err != nil {
logutil.Errorf("flush error %v", err)
return err
return nil, err
}
}
return nil
return &util.RunResult{AffectRows: 0}, nil
}).AnyTimes()
mcw := mock_frontend.NewMockComputationWrapper(ctrl)
mcw.EXPECT().GetAst().Return(stmt).AnyTimes()
Expand Down
15 changes: 9 additions & 6 deletions pkg/frontend/test/types_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/frontend/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package frontend

import (
"context"

"github.com/fagongzi/goetty/v2/buf"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/config"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util"
)

const (
Expand All @@ -41,7 +43,7 @@ type (
)

type ComputationRunner interface {
Run(ts uint64) (err error)
Run(ts uint64) (*util.RunResult, error)
}

// ComputationWrapper is the wrapper of the computation
Expand All @@ -55,7 +57,7 @@ type ComputationWrapper interface {

GetColumns() ([]interface{}, error)

GetAffectedRows() uint64
// GetAffectedRows() uint64

Compile(requestCtx context.Context, u interface{}, fill func(interface{}, *batch.Batch) error) (interface{}, error)

Expand Down
Loading

0 comments on commit 8404a1f

Please sign in to comment.