Skip to content

Commit

Permalink
curve/tool-v2: check copyset log behind
Browse files Browse the repository at this point in the history
Signed-off-by: Cyber-SiKu <[email protected]>
  • Loading branch information
Cyber-SiKu committed Aug 8, 2022
1 parent bb9ee7f commit 207f26b
Show file tree
Hide file tree
Showing 16 changed files with 396 additions and 36 deletions.
1 change: 1 addition & 0 deletions tools-v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/docker-credential-helpers v0.6.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions tools-v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7h
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI=
github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
15 changes: 15 additions & 0 deletions tools-v2/internal/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,21 @@ var (
ErrTopology = func() *CmdError {
return NewInternalCmdError(29, "%s[%d] belongs to %s[%d] who was not found")
}
ErrCopysetGapKey = func() *CmdError {
return NewInternalCmdError(30, "Fail to parse copyset key! the line is: %s")
}
ErrCopysetGapState = func() *CmdError {
return NewInternalCmdError(30, "Fail to parse copyset[%d] state! the line is: %s")
}
ErrCopysetGapLastLogId = func() *CmdError {
return NewInternalCmdError(31, "Fail to parse copyset[%d] last_log_id! the line is: %s")
}
ErrCopysetGapReplicator = func() *CmdError {
return NewInternalCmdError(32, "Fail to parse copyset[%d] replicator! the line is: %s")
}
ErrCopysetGap = func() *CmdError {
return NewInternalCmdError(33, "Fail to parse copyset[%d]: state or storage or replicator is not found!")
}

// http error
ErrHttpUnreadableResult = func() *CmdError {
Expand Down
15 changes: 11 additions & 4 deletions tools-v2/internal/utils/copyset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ const (
COPYSET_NOTEXIST COPYSET_HEALTH_STATUS = 4
)

const (
COPYSET_OK_STR = "ok"
COPYSET_WARN_STR = "warn"
COPYSET_ERROR_STR = "error"
COPYSET_NOTEXIST_STR = "not exist"
)

var (
CopysetHealthStatus_Str = map[int32]string{
1: "ok",
2: "warn",
3: "error",
4: "not exist",
1: COPYSET_OK_STR,
2: COPYSET_WARN_STR,
3: COPYSET_ERROR_STR,
4: COPYSET_NOTEXIST_STR,
}
)

Expand Down
4 changes: 2 additions & 2 deletions tools-v2/internal/utils/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TranslateBitmapLocation(bitmapLocation string) (common.BitmapLocation, *cmd
return common.BitmapLocation(value), &retErr
}

func SplitPeerToAddr(peer string) (string, *cmderror.CmdError) {
func PeerAddressToAddr(peer string) (string, *cmderror.CmdError) {
items := strings.Split(peer, ":")
if len(items) != 3 {
err := cmderror.ErrSplitPeer()
Expand All @@ -63,7 +63,7 @@ func SplitPeerToAddr(peer string) (string, *cmderror.CmdError) {

func PeertoAddr(peer *common.Peer) (string, *cmderror.CmdError) {
address := peer.GetAddress()
return SplitPeerToAddr(address)
return PeerAddressToAddr(address)
}

const (
Expand Down
1 change: 1 addition & 0 deletions tools-v2/internal/utils/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
ROW_LEADER_PEER = "leaderPeer"
ROW_LEFT = "left"
ROW_LENGTH = "length"
ROW_LOG_GAP = "logGap"
ROW_METASERVER = "metaserver"
ROW_METASERVER_ADDR = "metaserverAddr"
ROW_MOUNT_NUM = "mountNum"
Expand Down
2 changes: 1 addition & 1 deletion tools-v2/pkg/cli/command/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func NewMetric(addrs []string, subUri string, timeout time.Duration) *Metric {
}
}

func QueryMetric(m Metric) (string, *cmderror.CmdError) {
func QueryMetric(m *Metric) (string, *cmderror.CmdError) {
response := make(chan string, 1)
size := len(m.Addrs)
if size > config.MaxChannelSize() {
Expand Down
105 changes: 95 additions & 10 deletions tools-v2/pkg/cli/command/curvefs/check/copyset/copyset.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ package copyset
import (
"fmt"
"sort"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/olekukonko/tablewriter"
cmderror "github.com/opencurve/curve/tools-v2/internal/error"
cobrautil "github.com/opencurve/curve/tools-v2/internal/utils"
Expand All @@ -42,9 +45,11 @@ const (

type CopysetCommand struct {
basecmd.FinalCurveCmd
key2Copyset *map[uint64]*cobrautil.CopysetInfoStatus
copysetKey2Status *map[uint64]cobrautil.COPYSET_HEALTH_STATUS
health cobrautil.ClUSTER_HEALTH_STATUS
key2Copyset *map[uint64]*cobrautil.CopysetInfoStatus
copysetKey2Status *map[uint64]cobrautil.COPYSET_HEALTH_STATUS
copysetKey2LeaderInfo *map[uint64]*CopysetLeaderInfo
health cobrautil.ClUSTER_HEALTH_STATUS
leaderAddr mapset.Set[string]
}

var _ basecmd.FinalCurveCmdFunc = (*CopysetCommand)(nil) // check interface
Expand Down Expand Up @@ -72,7 +77,10 @@ func GetCopysetsStatus(caller *cobra.Command, copysetIds string, poolIds string)
fmt.Sprintf("--%s", config.CURVEFS_POOLID), poolIds,
fmt.Sprintf("--%s", config.FORMAT), config.FORMAT_NOOUT,
})
cobrautil.AlignFlagsValue(caller, checkCopyset.Cmd, []string{config.RPCRETRYTIMES, config.RPCTIMEOUT, config.CURVEFS_MDSADDR})
cobrautil.AlignFlagsValue(caller, checkCopyset.Cmd, []string{
config.RPCRETRYTIMES, config.RPCTIMEOUT, config.CURVEFS_MDSADDR,
config.CURVEFS_MARGIN,
})
checkCopyset.Cmd.SilenceErrors = true
err := checkCopyset.Cmd.Execute()
if err != nil {
Expand All @@ -89,6 +97,7 @@ func (cCmd *CopysetCommand) AddFlags() {
config.AddFsMdsAddrFlag(cCmd.Cmd)
config.AddCopysetidSliceRequiredFlag(cCmd.Cmd)
config.AddPoolidSliceRequiredFlag(cCmd.Cmd)
config.AddMarginOptionFlag(cCmd.Cmd)
}

func (cCmd *CopysetCommand) Init(cmd *cobra.Command, args []string) error {
Expand All @@ -99,14 +108,35 @@ func (cCmd *CopysetCommand) Init(cmd *cobra.Command, args []string) error {
return queryCopysetErr.ToError()
}
cCmd.Error = queryCopysetErr
header := []string{cobrautil.ROW_COPYSET_KEY, cobrautil.ROW_COPYSET_ID, cobrautil.ROW_POOL_ID, cobrautil.ROW_STATUS, cobrautil.ROW_EXPLAIN}
header := []string{cobrautil.ROW_COPYSET_KEY, cobrautil.ROW_COPYSET_ID,
cobrautil.ROW_POOL_ID, cobrautil.ROW_STATUS, cobrautil.ROW_LOG_GAP,
cobrautil.ROW_EXPLAIN,
}
cCmd.SetHeader(header)
cCmd.TableNew.SetAutoMergeCellsByColumnIndex(cobrautil.GetIndexSlice(
cCmd.Header, []string{cobrautil.ROW_COPYSET_ID, cobrautil.ROW_POOL_ID,
cobrautil.ROW_STATUS,
}))
}))
copysetKey2Status := make(map[uint64]cobrautil.COPYSET_HEALTH_STATUS)
cCmd.copysetKey2Status = &copysetKey2Status

// update leaderAddr
cCmd.leaderAddr = mapset.NewSet[string]()
for _, copysetInfo := range *cCmd.key2Copyset {
leader := copysetInfo.Info.GetLeaderPeer()
addr, err := cobrautil.PeertoAddr(leader)
if err.TypeCode() != cmderror.CODE_SUCCESS {
return err.ToError()
}
cCmd.leaderAddr.Add(addr)
}
timeout := config.GetRpcTimeout(cCmd.Cmd)
key2LeaderInfo := make(map[uint64]*CopysetLeaderInfo)
cCmd.copysetKey2LeaderInfo = &key2LeaderInfo
err := cCmd.UpdateCopysteGap(timeout)
if err.TypeCode() != cmderror.CODE_SUCCESS {
return err.ToError()
}
return nil
}

Expand All @@ -130,18 +160,46 @@ func (cCmd *CopysetCommand) RunCommand(cmd *cobra.Command, args []string) error
status, errsCheck := cobrautil.CheckCopySetHealth(v)
copysetHealthCount[status]++
row[cobrautil.ROW_STATUS] = cobrautil.CopysetHealthStatus_Str[int32(status)]
explain := ""
if status != cobrautil.COPYSET_OK {
explain := ""
for i, e := range errsCheck {
if i != len(errsCheck) {
explain += fmt.Sprintf("%s, ", e.Message)
explain += fmt.Sprintf("%s\n", e.Message)
} else {
explain += e.Message
}
errs = append(errs, e)
}
row[cobrautil.ROW_EXPLAIN] = explain
}
magrin := config.GetMarginOptionFlag(cCmd.Cmd)
leaderInfo := (*cCmd.copysetKey2LeaderInfo)[k]
if leaderInfo.Snapshot {
installSnapshot := "installing snapshot"
if len(explain) > 0 {
explain += "\n"
}
explain += installSnapshot
if row[cobrautil.ROW_STATUS] == cobrautil.COPYSET_OK_STR {
row[cobrautil.ROW_STATUS] = cobrautil.COPYSET_WARN_STR
copysetHealthCount[cobrautil.COPYSET_WARN]++
}
} else {
gap := leaderInfo.Gap
row[cobrautil.ROW_LOG_GAP] = fmt.Sprintf("%d", gap)
if gap >= magrin {
behind := fmt.Sprintf("log behind %d", gap)
if len(explain) > 0 {
explain += "\n"
}
explain += behind
if row[cobrautil.ROW_STATUS] == cobrautil.COPYSET_OK_STR {
row[cobrautil.ROW_STATUS] = cobrautil.COPYSET_WARN_STR
copysetHealthCount[cobrautil.COPYSET_WARN]++
}
}

}
row[cobrautil.ROW_EXPLAIN] = explain
}
rows = append(rows, row)
}
Expand All @@ -158,7 +216,7 @@ func (cCmd *CopysetCommand) RunCommand(cmd *cobra.Command, args []string) error
return rows[i][cobrautil.ROW_COPYSET_KEY] < rows[j][cobrautil.ROW_COPYSET_KEY]
})
list := cobrautil.ListMap2ListSortByKeys(rows, cCmd.Header, []string{
cobrautil.ROW_COPYSET_ID, cobrautil.ROW_POOL_ID, cobrautil.ROW_STATUS,
cobrautil.ROW_POOL_ID, cobrautil.ROW_STATUS, cobrautil.ROW_COPYSET_ID,
})
cCmd.TableNew.AppendBulk(list)
cCmd.Result = rows
Expand All @@ -168,3 +226,30 @@ func (cCmd *CopysetCommand) RunCommand(cmd *cobra.Command, args []string) error
func (cCmd *CopysetCommand) ResultPlainOutput() error {
return output.FinalCmdOutputPlain(&cCmd.FinalCurveCmd)
}

func (cCmd *CopysetCommand) UpdateCopysteGap(timeout time.Duration) *cmderror.CmdError {
var key2LeaderInfo sync.Map
size := config.MaxChannelSize()
errChan := make(chan *cmderror.CmdError, size)
count := 0
for iter := range cCmd.leaderAddr.Iter() {
go func(addr string) {
err := GetLeaderCopysetGap(addr, &key2LeaderInfo, timeout)
errChan <- err
}(iter)
count++
}
var errs []*cmderror.CmdError
for i := 0; i < count; i++ {
err := <-errChan
if err.TypeCode() != cmderror.CODE_SUCCESS {
errs = append(errs, err)
}
}
key2LeaderInfo.Range(func(key, value interface{}) bool {
(*cCmd.copysetKey2LeaderInfo)[key.(uint64)] = value.(*CopysetLeaderInfo)
return true
})
retErr := cmderror.MergeCmdErrorExceptSuccess(errs)
return &retErr
}
Loading

0 comments on commit 207f26b

Please sign in to comment.