Skip to content

Commit

Permalink
Refactor fault injection (#20509)
Browse files Browse the repository at this point in the history
新增fault_inject函数,与mo_ctl平级,只有admin可以使用,语法如下:
```
select fault_inject('pod','method','parameter');
```
在原来的fault inject基础上增添了一些新特性
* 指定CN|DN
* 指定fault key 是否为constant
* 更好的可读性

具体用法参考文档:
https://github.com/matrixorigin/docs/blob/main/tech-notes/dnservice/fault_injection.md

Approved by: @daviszhen, @ouyuanning, @LeftHandCold, @m-schen, @qingxinhome, @badboynt1, @reusee, @zhangxu19830126, @aunjgr, @sukki37, @XuPeng-SH
  • Loading branch information
Wenbin1002 authored Dec 7, 2024
1 parent 0f09078 commit 953cf25
Show file tree
Hide file tree
Showing 36 changed files with 1,634 additions and 1,058 deletions.
11 changes: 5 additions & 6 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *service) initQueryCommandHandler() {
s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCache, s.handleFileServiceCacheRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCacheEvict, s.handleFileServiceCacheEvictRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_MetadataCache, s.handleMetadataCacheRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_FaultInjection, s.handleFaultInjection, false)
s.queryService.AddHandleFunc(query.CmdMethod_FaultInject, s.handleFaultInjection, false)
s.queryService.AddHandleFunc(query.CmdMethod_CtlMoTableStats, s.handleMoTableStats, false)
}

Expand Down Expand Up @@ -148,11 +149,9 @@ func (s *service) handleTraceSpan(ctx context.Context, req *query.Request, resp
}

func (s *service) handleFaultInjection(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
resp.TraceSpanResponse = new(query.TraceSpanResponse)
resp.FaultInjectionResponse.Resp = ctl.HandleCnFaultInjection(
ctx, req.FaultInjectionRequest.Name,
req.FaultInjectionRequest.Freq, req.FaultInjectionRequest.Action,
req.FaultInjectionRequest.Iarg, req.FaultInjectionRequest.Sarg,
resp.FaultInjectResponse = new(query.FaultInjectResponse)
resp.FaultInjectResponse.Resp = fault.HandleFaultInject(
ctx, req.FaultInjectRequest.Method, req.FaultInjectRequest.Parameters,
)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,7 @@ func Test_panic(t *testing.T) {
defer ctrl.Finish()

runPanic := func(panicChoice int64) {
fault.AddFaultPoint(context.Background(), "exec_request_panic", ":::", "panic", panicChoice, "has panic")
fault.AddFaultPoint(context.Background(), "exec_request_panic", ":::", "panic", panicChoice, "has panic", false)
defer fault.RemoveFaultPoint(context.Background(), "exec_request_panic")

ses := newTestSession(t, ctrl)
Expand Down Expand Up @@ -1595,7 +1595,7 @@ func Test_run_panic(t *testing.T) {
defer ctrl.Finish()

runPanic := func(panicChoice int64) {
fault.AddFaultPoint(context.Background(), "executeStmtWithWorkspace_panic", ":::", "panic", panicChoice, "has panic")
fault.AddFaultPoint(context.Background(), "executeStmtWithWorkspace_panic", ":::", "panic", panicChoice, "has panic", false)
defer fault.RemoveFaultPoint(context.Background(), "executeStmtWithWorkspace_panic")

ses := newTestSession(t, ctrl)
Expand Down
5 changes: 5 additions & 0 deletions pkg/objectio/injects.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func InjectLogging(
"echo",
iarg,
sarg,
false,
); err != nil {
return
}
Expand All @@ -221,6 +222,7 @@ func InjectLog1(
"echo",
iarg,
tableName,
false,
); err != nil {
return
}
Expand All @@ -231,6 +233,7 @@ func InjectLog1(
"echo",
iarg,
tableName,
false,
); err != nil {
fault.RemoveFaultPoint(context.Background(), FJ_LogReader)
return
Expand All @@ -243,6 +246,7 @@ func InjectLog1(
"echo",
iarg,
tableName,
false,
); err != nil {
fault.RemoveFaultPoint(context.Background(), FJ_LogReader)
fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState)
Expand Down Expand Up @@ -282,6 +286,7 @@ func InjectRanges(
"echo",
int64(MakeInjectTableLoggingIntArg(0, true)),
tableName,
false,
); err != nil {
return
}
Expand Down
318 changes: 161 additions & 157 deletions pkg/pb/api/api.pb.go

Large diffs are not rendered by default.

729 changes: 295 additions & 434 deletions pkg/pb/query/query.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/queryservice/client/query_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var methodVersions = map[pb.CmdMethod]int64{
pb.CmdMethod_FileServiceCacheEvict: defines.MORPCVersion3,
pb.CmdMethod_MetadataCache: defines.MORPCVersion4,
pb.CmdMethod_GOGCPercent: defines.MORPCVersion4,
pb.CmdMethod_FaultInjection: defines.MORPCVersion4,
pb.CmdMethod_FaultInject: defines.MORPCVersion4,
pb.CmdMethod_CtlMoTableStats: defines.MORPCVersion4,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestCompileWithFaults(t *testing.T) {
require.NoError(t, pc.Close())
}()

fault.AddFaultPoint(ctx, "panic_in_batch_append", ":::", "panic", 0, "")
fault.AddFaultPoint(ctx, "panic_in_batch_append", ":::", "panic", 0, "", false)
tc := newTestCase("select * from R join S on R.uid = S.uid", t)
ctrl := gomock.NewController(t)
txnCli, txnOp := newTestTxnClientAndOp(ctrl)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/remoterunClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestRemoteRun(t *testing.T) {
runtime.ServiceRuntime("").SetGlobalVariables(runtime.PipelineClient, tPCli)

fault.Enable()
fault.AddFaultPoint(ctx, "inject_send_pipeline", ":::", "echo", 0, "test_tbl")
fault.AddFaultPoint(ctx, "inject_send_pipeline", ":::", "echo", 0, "test_tbl", false)

txnCli, txnOp := newTestTxnClientAndOp(ctrl)
proc.Base.TxnClient = txnCli
Expand Down
160 changes: 1 addition & 159 deletions pkg/sql/plan/function/ctl/cmd_addfaultpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@
package ctl

import (
"context"
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/engine/cmd_util"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"strconv"
"strings"
)

func handleTNAddFaultPoint() handleFunc {
func handleAddFaultPoint() handleFunc {
return GetTNHandlerFunc(
api.OpCode_OpAddFaultPoint,
func(string) ([]uint64, error) { return nil, nil },
Expand Down Expand Up @@ -78,156 +73,3 @@ func handleTNAddFaultPoint() handleFunc {
return resp, nil
})
}

type CNResponse struct {
CNid string `json:"cn_id,omitempty"`
ReturnStr string `json:"return_str,omitempty"`
ErrorStr string `json:"error_str,omitempty"`
}

func handleAddFaultPoint(
proc *process.Process,
service serviceType,
parameter string,
sender requestSender,
) (Result, error) {
if service != cn && service != tn {
return Result{}, moerr.NewWrongServiceNoCtx("CN or DN", string(service))
}

if service == tn {
return handleTNAddFaultPoint()(proc, service, parameter, sender)
}

// parameter like "cnid.name.freq.action.iarg.sarg"
name, freq, action, iarg, sarg, cns, err := getInputs(proc.Ctx, parameter)
if err != nil {
return Result{}, err
}

if len(cns) == 1 && strings.ToLower(cns[0]) == "all" {
cns = make([]string, 0)
clusterservice.GetMOCluster(proc.GetService()).GetCNService(clusterservice.Selector{}, func(cn metadata.CNService) bool {
cns = append(cns, cn.ServiceID)
return true
})
}

cnRes := make([]CNResponse, 0)

for idx := range cns {
res := CNResponse{
CNid: cns[idx],
}
// the current cn also need to process this span cmd
if cns[idx] == proc.GetQueryClient().ServiceID() {
res.ReturnStr = HandleCnFaultInjection(proc.Ctx, name, freq, action, iarg, sarg)
} else {
request := proc.GetQueryClient().NewRequest(query.CmdMethod_FaultInjection)
request.FaultInjectionRequest = &query.FaultInjectionRequest{
Name: name,
Freq: freq,
Action: action,
Iarg: iarg,
Sarg: sarg,
}
// transfer query to another cn and receive its response
resp, err := transferRequest2OtherCNs(proc, cns[idx], request)
if err != nil {
res.ErrorStr = err.Error()
} else {
res.ReturnStr = resp.TraceSpanResponse.Resp
}
}
cnRes = append(cnRes, res)
}

return Result{
Method: AddFaultPointMethod,
Data: cnRes,
}, nil
}

func getInputs(ctx context.Context, input string) (
name string,
freq string,
action string,
iarg int64,
sarg string,
cns []string,
err error,
) {
args := strings.Split(input, ".")
// cn uuid
if len(args) < 2 {
err = moerr.NewInternalError(ctx, "invalid argument! should be like \"cnid.name.freq.action.iarg.sarg\"")
return
}
cns = strings.Split(args[0], ",")
name = args[1]
if name == "" {
err = moerr.NewInternalError(ctx, "fault point's name should not be empty!")
return
}
if len(args) == 2 {
return
}
if len(args) != 6 {
err = moerr.NewInternalError(ctx, "invalid argument! should be like \"cnid.name.freq.action.iarg.sarg\"")
return
}
freq = args[2]
action = args[3]
i, err := strconv.Atoi(args[4])
iarg = int64(i)
if err != nil {
return
}
sarg = args[5]

return
}

const (
enable = "enable_fault_injection"
disable = "disable_fault_injection"
status = "status_fault_injection"
)

func HandleCnFaultInjection(
ctx context.Context,
name string,
freq string,
action string,
iarg int64,
sarg string,
) (res string) {
switch name {
case enable:
res = "fault injection enabled, previous status: "
if fault.Enable() {
res += "disabled"
} else {
res += "enabled"
}
case disable:
res = "fault injection disabled, previous status: "
if fault.Disable() {
res += "enabled"
} else {
res += "disabled"
}
case status:
if fault.Status() {
res = "fault injection is enabled"
} else {
res = "fault injection is disabled"
}
default:
if err := fault.AddFaultPoint(ctx, name, freq, action, iarg, sarg); err != nil {
return err.Error()
}
res = "OK"
}
return
}
Loading

0 comments on commit 953cf25

Please sign in to comment.