Skip to content

Commit

Permalink
refactor the context field for Process. (#17823)
Browse files Browse the repository at this point in the history
1. refactor the process's context and process holds three context now: clientContext, queryContext and pipelineContext.
2. add lots of methods for process.
3. remove lots of useless codes.

Approved by: @reusee, @daviszhen, @badboynt1, @aunjgr, @qingxinhome, @XuPeng-SH, @ouyuanning, @LeftHandCold, @zhangxu19830126
  • Loading branch information
m-schen authored Aug 6, 2024
1 parent e0efeee commit 7f9a697
Show file tree
Hide file tree
Showing 70 changed files with 843 additions and 771 deletions.
2 changes: 2 additions & 0 deletions pkg/bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (s *service) checkAlreadyBootstrapped(ctx context.Context) (bool, error) {
return true, nil
}
}
// todo: these should do a log here to indicate that the system is not bootstrapped like the following
// "show databases cannot find the bootstrappedCheckerDB."
return false, nil
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ func (s *service) handleRequest(
s.pipelines.counter.Add(1)
defer s.pipelines.counter.Add(-1)

err := s.requestHandler(ctx,
// there is no need to handle the return error, because the error will be logged in the function.
_ = s.requestHandler(ctx,
s.pipelineServiceServiceAddr(),
req,
cs,
Expand All @@ -462,11 +463,6 @@ func (s *service) handleRequest(
s._txnClient,
s.aicm,
s.acquireMessage)
if err != nil {
logutil.Infof("error occurred while handling the pipeline message, "+
"msg is %v, error is %v",
req, err)
}
}()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/embed/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (c *cluster) Start() error {
}
}(s)
}

wg.Wait()

select {
case err := <-errC:
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/back_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func doComQueryInBack(
//the ses.GetUserName returns the user_name with the account_name.
//here,we only need the user_name.
userNameOnly := rootName
proc := process.New(
proc := process.NewTopProcess(
execCtx.reqCtx,
backSes.pool,
getGlobalPu().TxnClient,
Expand Down Expand Up @@ -553,7 +553,7 @@ func executeStmtInSameSession(ctx context.Context, ses *Session, execCtx *ExecCt
ses.ReplaceDerivedStmt(prevDerivedStmt)
//@todo we need to improve: make one session, one proc, one txnOperator
p := ses.GetTxnCompileCtx().GetProcess()
p.FreeVectors()
p.Free()
execCtx.proc = proc
ses.GetTxnHandler().SetOptionBits(prevOptionBits)
ses.GetTxnHandler().SetServerStatus(prevServerStatus)
Expand Down
8 changes: 3 additions & 5 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro
cwft.compile.SetOriginSQL(originSQL)
} else {
// retComp
cwft.proc.Ctx = execCtx.reqCtx
cwft.proc.ReplaceTopCtx(execCtx.reqCtx)
retComp.Reset(cwft.proc, getStatementStartAt(execCtx.reqCtx), fill, cwft.ses.GetSql())
cwft.compile = retComp
}
Expand All @@ -281,9 +281,7 @@ func (cwft *TxnComputationWrapper) Compile(any any, fill func(*batch.Batch) erro
func updateTempStorageInCtx(execCtx *ExecCtx, proc *process.Process, tempStorage *memorystorage.Storage) {
if execCtx != nil && execCtx.reqCtx != nil {
execCtx.reqCtx = attachValue(execCtx.reqCtx, defines.TemporaryTN{}, tempStorage)
}
if proc != nil && proc.Ctx != nil {
proc.Ctx = attachValue(proc.Ctx, defines.TemporaryTN{}, tempStorage)
proc.ReplaceTopCtx(execCtx.reqCtx)
}
}

Expand Down Expand Up @@ -410,7 +408,7 @@ func createCompile(
if len(getGlobalPu().ClusterNodes) > 0 {
addr = getGlobalPu().ClusterNodes[0].Addr
}
proc.Ctx = execCtx.reqCtx
proc.ReplaceTopCtx(execCtx.reqCtx)
proc.Base.FileService = getGlobalPu().FileService

var tenant string
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2703,7 +2703,7 @@ func doComQuery(ses *Session, execCtx *ExecCtx, input *UserInput) (retErr error)
ses.tStmt = nil

proc := ses.proc
proc.Ctx = execCtx.reqCtx
proc.ReplaceTopCtx(execCtx.reqCtx)

proc.CopyVectorPool(ses.proc)
proc.CopyValueScanBatch(ses.proc)
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/routine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ var newMockWrapper = func(ctrl *gomock.Controller, ses *Session,
proto := ses.GetResponser().MysqlRrWr()
if mrs != nil {
if res.isSleepSql {
topCtx := proc.GetTopContext()
select {
case <-time.After(time.Duration(res.seconds) * time.Second):
res.resultX.Store(timeout)
case <-proc.Ctx.Done():
case <-topCtx.Done():
res.resultX.Store(contextCancel)
}
}
Expand Down
26 changes: 11 additions & 15 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func NewSession(
panic(err)
}
}
ses.proc = process.New(
ses.proc = process.NewTopProcess(
context.TODO(),
ses.pool,
getGlobalPu().TxnClient,
Expand Down Expand Up @@ -620,29 +620,25 @@ func (ses *Session) Close() {
ses.sqlHelper = nil
}
ses.ClearStmtProfile()
// The mpool cleanup must be placed at the end,
// and you must wait for all resources to be cleaned up before you can delete the mpool
if ses.proc != nil {
ses.proc.FreeVectors()
bats := ses.proc.GetValueScanBatchs()
for _, bat := range bats {
bat.Clean(ses.proc.Mp())
}
ses.proc = nil
}

ses.proc.Free()
ses.proc = nil

for _, bat := range ses.resultBatches {
bat.Clean(ses.pool)
}

pool := ses.GetMemPool()
mpool.DeleteMPool(pool)
ses.SetMemPool(nil)

if ses.buf != nil {
ses.buf.Free()
ses.buf = nil
}

// The mpool cleanup must be placed at the end,
// and you must wait for all resources to be cleaned up before you can delete the mpool
pool := ses.GetMemPool()
mpool.DeleteMPool(pool)
ses.SetMemPool(nil)

ses.timestampMap = nil
ses.upstream = nil
ses.rm = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/show_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func requestStorageUsage(ctx context.Context, ses *Session, accIds [][]int64) (r
txnOperator := ses.txnHandler.GetTxn()

// create a new proc for `handler`
proc := process.New(ctx, ses.proc.GetMPool(),
proc := process.NewTopProcess(ctx, ses.proc.GetMPool(),
ses.proc.Base.TxnClient, txnOperator,
ses.proc.Base.FileService, ses.proc.Base.LockService,
ses.proc.Base.QueryClient, ses.proc.Base.Hakeeper,
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

const (
DefaultRpcBufferSize = 1 << 10
)

type (
TxnOperator = client.TxnOperator
TxnClient = client.TxnClient
TxnOption = client.TxnOption
)

type ComputationRunner interface {
// todo: remove the ts next day, that's useless.
Run(ts uint64) (*util.RunResult, error)
}

// compile.Compile should implement ComputationRunner to support Run method.
var _ ComputationRunner = &compile.Compile{}

// ComputationWrapper is the wrapper of the computation
type ComputationWrapper interface {
ComputationRunner
Expand Down
7 changes: 4 additions & 3 deletions pkg/frontend/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,10 @@ func GetSimpleExprValue(ctx context.Context, e tree.Expr, ses *Session) (interfa
}
// set @a = 'on', type of a is bool. And mo cast rule does not fit set variable rule so delay to convert type.
// Here the evalExpr may execute some function that needs engine.Engine.
ses.txnCompileCtx.GetProcess().Ctx = attachValue(ses.txnCompileCtx.GetProcess().Ctx,
defines.EngineKey{},
ses.GetTxnHandler().GetStorage())
ses.txnCompileCtx.GetProcess().ReplaceTopCtx(
attachValue(ses.txnCompileCtx.GetProcess().GetTopContext(),
defines.EngineKey{},
ses.GetTxnHandler().GetStorage()))

vec, err := colexec.EvalExpressionOnce(ses.txnCompileCtx.GetProcess(), planExpr, []*batch.Batch{batch.EmptyForConstFoldBatch})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func Test_makeExecuteSql(t *testing.T) {
}
defer mpool.DeleteMPool(mp)

testProc := process.New(context.Background(), mp, nil, nil, nil, nil, nil, nil, nil, nil)
testProc := process.NewTopProcess(context.Background(), mp, nil, nil, nil, nil, nil, nil, nil, nil)

params1 := testProc.GetVector(types.T_text.ToType())
for i := 0; i < 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestConnector(t *testing.T) {
}
tc.arg.Free(tc.proc, false, nil)
tc.arg.GetChildren(0).Free(tc.proc, false, nil)
tc.proc.FreeVectors()
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/deletion/deletion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,6 @@ func TestNormalDeletion(t *testing.T) {
require.NoError(t, err)
arg.Free(proc, false, nil)
arg.GetChildren(0).Free(proc, false, nil)
proc.FreeVectors()
proc.Free()
require.Equal(t, int64(0), proc.GetMPool().CurrNB())
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestDispatch(t *testing.T) {
msg.Batch.Clean(tc.proc.Mp())
}
}
tc.proc.FreeVectors()
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/evalExpression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestFunctionExpressionExecutor(t *testing.T) {
require.NoError(t, err)
require.Equal(t, curr3, proc.Mp().CurrNB())
fExprExecutor.Free()
proc.FreeVectors()
proc.Free()
require.Equal(t, currStart, proc.Mp().CurrNB())
}

Expand Down Expand Up @@ -211,7 +211,7 @@ func TestFunctionExpressionExecutor(t *testing.T) {
_, ok := executor.(*FixedVectorExpressionExecutor)
require.Equal(t, true, ok)
executor.Free()
proc.FreeVectors()
proc.Free()
require.Equal(t, currNb, proc.Mp().CurrNB())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/group/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestGroup(t *testing.T) {

tc.arg.Free(tc.proc, false, nil)
tc.arg.GetChildren(0).Free(tc.proc, false, nil)
tc.proc.FreeVectors()
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func BenchmarkGroup(b *testing.B) {

tc.arg.Free(tc.proc, false, nil)
tc.arg.GetChildren(0).Free(tc.proc, false, nil)
tc.proc.FreeVectors()
tc.proc.Free()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/insert/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestInsertOperator(t *testing.T) {

argument1.Free(proc, false, nil)
argument1.GetChildren(0).Free(proc, false, nil)
proc.FreeVectors()
proc.Free()
require.Equal(t, int64(0), proc.GetMPool().CurrNB())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/intersect/intersect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestIntersect(t *testing.T) {
c.proc.Reg.MergeReceivers[0].Ch <- nil
c.proc.Reg.MergeReceivers[1].Ch <- nil
c.arg.Free(c.proc, false, nil)
proc.FreeVectors()
proc.Free()
require.Equal(t, int64(0), c.proc.Mp().CurrNB())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/intersectall/intersectall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestIntersectAll(t *testing.T) {
c.proc.Reg.MergeReceivers[0].Ch <- nil
c.proc.Reg.MergeReceivers[1].Ch <- nil
c.arg.Free(c.proc, false, nil)
c.proc.FreeVectors()
c.proc.Free()
require.Equal(t, int64(0), c.proc.Mp().CurrNB())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/limit/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestLimit(t *testing.T) {
_, _ = tc.arg.Call(tc.proc)
tc.arg.Free(tc.proc, false, nil)
tc.arg.GetChildren(0).Free(tc.proc, false, nil)
tc.proc.FreeVectors()
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/lockop/lock_op_no_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func getInternalProcessByUniqueID(
return nil, err
}
v, _ := runtime.ServiceRuntime(sid).GetGlobalVariables(runtime.LockService)
proc := process.New(
proc := process.NewTopProcess(
ctx,
mp,
txnClient,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/lockop/lock_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestLockWithBlocking(t *testing.T) {
},
func(arg *LockOp, proc *process.Process) {
arg.Free(proc, false, nil)
proc.FreeVectors()
proc.Free()
},
)
}
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestLockWithBlockingWithConflict(t *testing.T) {
bat.Clean(proc.Mp())
}
arg.Free(proc, false, nil)
proc.FreeVectors()
proc.Free()
},
)
}
Expand Down Expand Up @@ -590,7 +590,7 @@ func runLockOpTest(
txnOp, err := c.New(ctx, timestamp.Timestamp{})
require.NoError(t, err)

proc := process.New(
proc := process.NewTopProcess(
ctx,
mpool.MustNewZero(),
c,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/merge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestMerge(t *testing.T) {
}
}
tc.arg.Free(tc.proc, false, nil)
tc.proc.FreeVectors()
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/mergeblock/mergeblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestMergeBlock(t *testing.T) {
argument1.container.mp[k].Clean(proc.GetMPool())
}
argument1.GetChildren(0).Free(proc, false, nil)
proc.FreeVectors()
proc.Free()
require.Equal(t, int64(0), proc.GetMPool().CurrNB())
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func TestArgument_GetMetaLocBat(t *testing.T) {
arg.container.mp[k].Clean(proc.GetMPool())
}

proc.FreeVectors()
proc.Free()
bat.Clean(proc.GetMPool())
require.Equal(t, int64(0), proc.GetMPool().CurrNB())
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/mergeorder/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestOrder(t *testing.T) {
}
}
}
tc.proc.FreeVectors()
tc.proc.Free()
tc.arg.Free(tc.proc, false, nil)
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/minus/minus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestMinus(t *testing.T) {
c.proc.Reg.MergeReceivers[1].Ch <- nil

c.arg.Free(c.proc, false, nil)
c.proc.FreeVectors()
c.proc.Free()
require.Equal(t, int64(0), c.proc.Mp().CurrNB())
}

Expand Down
Loading

0 comments on commit 7f9a697

Please sign in to comment.