Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fault injection #20509

Merged
merged 17 commits into from
Dec 7, 2024
11 changes: 5 additions & 6 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cnservice

import (
"context"
"github.com/matrixorigin/matrixone/pkg/util/fault"
"runtime/debug"
"strings"

Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *service) initQueryCommandHandler() {
s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCache, s.handleFileServiceCacheRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_FileServiceCacheEvict, s.handleFileServiceCacheEvictRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_MetadataCache, s.handleMetadataCacheRequest, false)
s.queryService.AddHandleFunc(query.CmdMethod_FaultInjection, s.handleFaultInjection, false)
s.queryService.AddHandleFunc(query.CmdMethod_FaultInject, s.handleFaultInjection, false)
}

func (s *service) handleKillConn(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
Expand Down Expand Up @@ -147,11 +148,9 @@ func (s *service) handleTraceSpan(ctx context.Context, req *query.Request, resp
}

func (s *service) handleFaultInjection(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
resp.TraceSpanResponse = new(query.TraceSpanResponse)
resp.FaultInjectionResponse.Resp = ctl.HandleCnFaultInjection(
ctx, req.FaultInjectionRequest.Name,
req.FaultInjectionRequest.Freq, req.FaultInjectionRequest.Action,
req.FaultInjectionRequest.Iarg, req.FaultInjectionRequest.Sarg,
resp.FaultInjectResponse = new(query.FaultInjectResponse)
resp.FaultInjectResponse.Resp = fault.HandleFaultInject(
ctx, req.FaultInjectRequest.Method, req.FaultInjectRequest.Parameters,
)
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/objectio/injects.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func InjectLogging(
"echo",
iarg,
sarg,
false,
); err != nil {
return
}
Expand All @@ -221,6 +222,7 @@ func InjectLog1(
"echo",
iarg,
tableName,
false,
); err != nil {
return
}
Expand All @@ -231,6 +233,7 @@ func InjectLog1(
"echo",
iarg,
tableName,
false,
); err != nil {
fault.RemoveFaultPoint(context.Background(), FJ_LogReader)
return
Expand All @@ -243,6 +246,7 @@ func InjectLog1(
"echo",
iarg,
tableName,
false,
); err != nil {
fault.RemoveFaultPoint(context.Background(), FJ_LogReader)
fault.RemoveFaultPoint(context.Background(), FJ_TracePartitionState)
Expand Down Expand Up @@ -282,6 +286,7 @@ func InjectRanges(
"echo",
int64(MakeInjectTableLoggingIntArg(0, true)),
tableName,
false,
); err != nil {
return
}
Expand Down
309 changes: 156 additions & 153 deletions pkg/pb/api/api.pb.go

Large diffs are not rendered by default.

722 changes: 292 additions & 430 deletions pkg/pb/query/query.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/queryservice/client/query_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var methodVersions = map[pb.CmdMethod]int64{
pb.CmdMethod_FileServiceCacheEvict: defines.MORPCVersion3,
pb.CmdMethod_MetadataCache: defines.MORPCVersion4,
pb.CmdMethod_GOGCPercent: defines.MORPCVersion4,
pb.CmdMethod_FaultInjection: defines.MORPCVersion4,
pb.CmdMethod_FaultInject: defines.MORPCVersion4,
}

type queryClient struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestCompileWithFaults(t *testing.T) {
require.NoError(t, pc.Close())
}()

fault.AddFaultPoint(ctx, "panic_in_batch_append", ":::", "panic", 0, "")
fault.AddFaultPoint(ctx, "panic_in_batch_append", ":::", "panic", 0, "", false)
tc := newTestCase("select * from R join S on R.uid = S.uid", t)
ctrl := gomock.NewController(t)
txnCli, txnOp := newTestTxnClientAndOp(ctrl)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/remoterunClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestRemoteRun(t *testing.T) {
runtime.ServiceRuntime("").SetGlobalVariables(runtime.PipelineClient, tPCli)

fault.Enable()
fault.AddFaultPoint(ctx, "inject_send_pipeline", ":::", "echo", 0, "test_tbl")
fault.AddFaultPoint(ctx, "inject_send_pipeline", ":::", "echo", 0, "test_tbl", false)

txnCli, txnOp := newTestTxnClientAndOp(ctrl)
proc.Base.TxnClient = txnCli
Expand Down
160 changes: 1 addition & 159 deletions pkg/sql/plan/function/ctl/cmd_addfaultpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@
package ctl

import (
"context"
"github.com/fagongzi/util/protoc"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/metadata"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/engine/cmd_util"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"strconv"
"strings"
)

func handleTNAddFaultPoint() handleFunc {
func handleAddFaultPoint() handleFunc {
return GetTNHandlerFunc(
api.OpCode_OpAddFaultPoint,
func(string) ([]uint64, error) { return nil, nil },
Expand Down Expand Up @@ -78,156 +73,3 @@ func handleTNAddFaultPoint() handleFunc {
return resp, nil
})
}

type CNResponse struct {
CNid string `json:"cn_id,omitempty"`
ReturnStr string `json:"return_str,omitempty"`
ErrorStr string `json:"error_str,omitempty"`
}

func handleAddFaultPoint(
proc *process.Process,
service serviceType,
parameter string,
sender requestSender,
) (Result, error) {
if service != cn && service != tn {
return Result{}, moerr.NewWrongServiceNoCtx("CN or DN", string(service))
}

if service == tn {
return handleTNAddFaultPoint()(proc, service, parameter, sender)
}

// parameter like "cnid.name.freq.action.iarg.sarg"
name, freq, action, iarg, sarg, cns, err := getInputs(proc.Ctx, parameter)
if err != nil {
return Result{}, err
}

if len(cns) == 1 && strings.ToLower(cns[0]) == "all" {
cns = make([]string, 0)
clusterservice.GetMOCluster(proc.GetService()).GetCNService(clusterservice.Selector{}, func(cn metadata.CNService) bool {
cns = append(cns, cn.ServiceID)
return true
})
}

cnRes := make([]CNResponse, 0)

for idx := range cns {
res := CNResponse{
CNid: cns[idx],
}
// the current cn also need to process this span cmd
if cns[idx] == proc.GetQueryClient().ServiceID() {
res.ReturnStr = HandleCnFaultInjection(proc.Ctx, name, freq, action, iarg, sarg)
} else {
request := proc.GetQueryClient().NewRequest(query.CmdMethod_FaultInjection)
request.FaultInjectionRequest = &query.FaultInjectionRequest{
Name: name,
Freq: freq,
Action: action,
Iarg: iarg,
Sarg: sarg,
}
// transfer query to another cn and receive its response
resp, err := transferRequest2OtherCNs(proc, cns[idx], request)
if err != nil {
res.ErrorStr = err.Error()
} else {
res.ReturnStr = resp.TraceSpanResponse.Resp
}
}
cnRes = append(cnRes, res)
}

return Result{
Method: AddFaultPointMethod,
Data: cnRes,
}, nil
}

func getInputs(ctx context.Context, input string) (
name string,
freq string,
action string,
iarg int64,
sarg string,
cns []string,
err error,
) {
args := strings.Split(input, ".")
// cn uuid
if len(args) < 2 {
err = moerr.NewInternalError(ctx, "invalid argument! should be like \"cnid.name.freq.action.iarg.sarg\"")
return
}
cns = strings.Split(args[0], ",")
name = args[1]
if name == "" {
err = moerr.NewInternalError(ctx, "fault point's name should not be empty!")
return
}
if len(args) == 2 {
return
}
if len(args) != 6 {
err = moerr.NewInternalError(ctx, "invalid argument! should be like \"cnid.name.freq.action.iarg.sarg\"")
return
}
freq = args[2]
action = args[3]
i, err := strconv.Atoi(args[4])
iarg = int64(i)
if err != nil {
return
}
sarg = args[5]

return
}

const (
enable = "enable_fault_injection"
disable = "disable_fault_injection"
status = "status_fault_injection"
)

func HandleCnFaultInjection(
ctx context.Context,
name string,
freq string,
action string,
iarg int64,
sarg string,
) (res string) {
switch name {
case enable:
res = "fault injection enabled, previous status: "
if fault.Enable() {
res += "disabled"
} else {
res += "enabled"
}
case disable:
res = "fault injection disabled, previous status: "
if fault.Disable() {
res += "enabled"
} else {
res += "disabled"
}
case status:
if fault.Status() {
res = "fault injection is enabled"
} else {
res = "fault injection is disabled"
}
default:
if err := fault.AddFaultPoint(ctx, name, freq, action, iarg, sarg); err != nil {
return err.Error()
}
res = "OK"
}
return
}
Loading
Loading