Skip to content

Commit

Permalink
Impove(enter): enter leader mds directly without id option
Browse files Browse the repository at this point in the history
Signed-off-by: lyp <[email protected]>
  • Loading branch information
LYPWYT committed Dec 4, 2023
1 parent 9d83a9c commit 2a6fe64
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 8 deletions.
75 changes: 68 additions & 7 deletions cli/command/enter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ package command

import (
"github.com/opencurve/curveadm/cli/cli"
comm "github.com/opencurve/curveadm/internal/common"
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/playbook"
"github.com/opencurve/curveadm/internal/tools"
"github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)

var (
AttachMdsLeaderContainer = []int{playbook.ATTACH_MDS_LEADER_CONTAINER}
)

type enterOptions struct {
id string
}
Expand All @@ -43,8 +49,11 @@ func NewEnterCommand(curveadm *cli.CurveAdm) *cobra.Command {
cmd := &cobra.Command{
Use: "enter ID",
Short: "Enter service container",
Args: utils.ExactArgs(1),
Args: utils.RequiresMaxArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return nil
}
options.id = args[0]
return curveadm.CheckId(options.id)
},
Expand All @@ -57,32 +66,84 @@ func NewEnterCommand(curveadm *cli.CurveAdm) *cobra.Command {
return cmd
}

func genMdsLeaderPlaybook(curveadm *cli.CurveAdm,
dcs []*topology.DeployConfig,
options statusOptions) (*playbook.Playbook, error) {
dcs = curveadm.FilterDeployConfig(dcs, topology.FilterOption{
Id: options.id,
Role: options.role,
Host: options.host,
})
if len(dcs) == 0 {
return nil, errno.ERR_NO_SERVICES_MATCHED
}

steps := []int{playbook.ATTACH_MDS_LEADER_CONTAINER}
pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
pb.AddStep(&playbook.PlaybookStep{
Type: step,
Configs: dcs,
ExecOptions: playbook.ExecOptions{
//Concurrency: 10,
SilentSubBar: true,
SilentMainBar: true,
SkipError: true,
},
})
}
return pb, nil
}

func runEnter(curveadm *cli.CurveAdm, options enterOptions) error {
// 1) parse cluster topology
dcs, err := curveadm.ParseTopology()
if err != nil {
return err
}
var containerId string
var dc *topology.DeployConfig
Id := options.id

// 2) filter service
// 2) If no id options, get Mds leader id
if Id == "" {
statusForMdsLeaderOptions := statusOptions{id: "*", role: ROLE_MDS, host: "*"}
pb, err := genMdsLeaderPlaybook(curveadm, dcs, statusForMdsLeaderOptions)
if err != nil {
return err
}
// run playground
err = pb.Run()
if err != nil {
return err
}
// get Mds leader id
value := curveadm.MemStorage().Get(comm.MDS_LEADER_ID)
Id = value.(string)
if Id == "" {
return errno.ERR_NO_LEADER_CONTAINER_FOUND
}
}

// 3) filter service
dcs = curveadm.FilterDeployConfig(dcs, topology.FilterOption{
Id: options.id,
Id: Id,
Role: "*",
Host: "*",
})
if len(dcs) == 0 {
return errno.ERR_NO_SERVICES_MATCHED
}

// 3) get container id
dc := dcs[0]
// 4) get container id
dc = dcs[0]
serviceId := curveadm.GetServiceId(dc.GetId())
containerId, err := curveadm.GetContainerId(serviceId)
containerId, err = curveadm.GetContainerId(serviceId)
if err != nil {
return err
}

// 4) attch remote container
// 5) attach remote container
home := dc.GetProjectLayout().ServiceRootDir
return tools.AttachRemoteContainer(curveadm, dc.GetHost(), containerId, home)
}
3 changes: 3 additions & 0 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const (
SERVICE_STATUS_LOSED = "Losed"
SERVICE_STATUS_UNKNOWN = "Unknown"

// leader
MDS_LEADER_ID = "MDS_LEADER_ID"

// clean
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
Expand Down
3 changes: 2 additions & 1 deletion internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ var (
ERR_UNSUPPORT_CLEAN_ITEM = EC(210005, "unsupport clean item")
ERR_NO_SERVICES_MATCHED = EC(210006, "no services matched")
// TODO: please check pool set disk type
ERR_INVALID_DISK_TYPE = EC(210007, "poolset disk type must be lowercase and can only be one of ssd, hdd and nvme")
ERR_INVALID_DISK_TYPE = EC(210007, "poolset disk type must be lowercase and can only be one of ssd, hdd and nvme")
ERR_NO_LEADER_CONTAINER_FOUND = EC(210008, "no leader container found")

// 220: commad options (client common)
ERR_UNSUPPORT_CLIENT_KIND = EC(220000, "unsupport client kind")
Expand Down
3 changes: 3 additions & 0 deletions internal/playbook/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (
GET_CLIENT_STATUS
INSTALL_CLIENT
UNINSTALL_CLIENT
ATTACH_MDS_LEADER_CONTAINER

// bs
FORMAT_CHUNKFILE_POOL
Expand Down Expand Up @@ -225,6 +226,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
t, err = comm.NewInitServiceStatusTask(curveadm, config.GetDC(i))
case GET_SERVICE_STATUS:
t, err = comm.NewGetServiceStatusTask(curveadm, config.GetDC(i))
case ATTACH_MDS_LEADER_CONTAINER:
t, err = comm.NewAttachMdsLeaderContainerTask(curveadm, config.GetDC(i))
case CLEAN_SERVICE:
t, err = comm.NewCleanServiceTask(curveadm, config.GetDC(i))
case INIT_SUPPORT:
Expand Down
94 changes: 94 additions & 0 deletions internal/task/task/common/service_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ type (
execOptions module.ExecOptions
}

step2GetMdsLeader struct {
dc *topology.DeployConfig
containerId string
isLeader *bool
execOptions module.ExecOptions
}

step2FormatServiceStatus struct {
dc *topology.DeployConfig
serviceId string
Expand All @@ -80,6 +87,12 @@ type (
memStorage *utils.SafeMap
}

step2SetMdsLeader struct {
serviceId string
isLeader *bool
memStorage *utils.SafeMap
}

ServiceStatus struct {
Id string
ParentId string
Expand Down Expand Up @@ -191,6 +204,22 @@ func (s *step2GetLeader) Execute(ctx *context.Context) error {
return nil
}

func (s *step2GetMdsLeader) Execute(ctx *context.Context) error {
dc := s.dc
if dc.GetRole() != topology.ROLE_MDS {
return nil
}

url := utils.Choose(dc.GetKind() == topology.KIND_CURVEBS,
URL_CURVEBS_METRIC_LEADER, URL_CURVEFS_METRIC_LEADER)
url = fmt.Sprintf(url, dc.GetListenIp(), dc.GetListenDummyPort())
command := fmt.Sprintf(COMMAND_CURL_MDS, url)
cmd := ctx.Module().DockerCli().ContainerExec(s.containerId, command)
out, _ := cmd.Execute(s.execOptions)
*s.isLeader = strings.Contains(out, SIGNATURE_LEADER)
return nil
}

func (s *step2FormatServiceStatus) Execute(ctx *context.Context) error {
status := *s.status
if s.containerId == comm.CLEANED_CONTAINER_ID { // container cleaned
Expand Down Expand Up @@ -218,6 +247,24 @@ func (s *step2FormatServiceStatus) Execute(ctx *context.Context) error {
return nil
}

func (s *step2SetMdsLeader) Execute(ctx *context.Context) error {
id := s.serviceId
IsLeader := *s.isLeader
s.memStorage.TX(func(kv *utils.SafeMap) error {
m := ""
v := kv.Get(comm.MDS_LEADER_ID)
if v != nil && v.(string) != "" {
return nil
}
if IsLeader {
m = id
}
kv.Set(comm.MDS_LEADER_ID, m)
return nil
})
return nil
}

func NewInitServiceStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) {
serviceId := curveadm.GetServiceId(dc.GetId())
containerId, err := curveadm.GetContainerId(serviceId)
Expand Down Expand Up @@ -306,3 +353,50 @@ func NewGetServiceStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig)

return t, nil
}

func NewAttachMdsLeaderContainerTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) {
serviceId := curveadm.GetServiceId(dc.GetId())
containerId, err := curveadm.GetContainerId(serviceId)
if curveadm.IsSkip(dc) {
return nil, nil
} else if err != nil {
return nil, err
}
hc, err := curveadm.GetHost(dc.GetHost())
if err != nil {
return nil, err
}

// new task
subname := fmt.Sprintf("host=%s role=%s containerId=%s",
dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId))
t := task.NewTask("Enter Leader container", subname, hc.GetSSHConfig())

// add step to task
var isLeader bool
var out string
host, role := dc.GetHost(), dc.GetRole()
t.AddStep(&step.ListContainers{
ShowAll: true,
Format: `"{{.ID}}"`,
Filter: fmt.Sprintf("id=%s", containerId),
Out: &out,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.Lambda{
Lambda: CheckContainerExist(host, role, containerId, &out),
})
t.AddStep(&step2GetMdsLeader{
dc: dc,
containerId: containerId,
isLeader: &isLeader,
execOptions: curveadm.ExecOptions(),
})
t.AddStep(&step2SetMdsLeader{
serviceId: serviceId,
isLeader: &isLeader,
memStorage: curveadm.MemStorage(),
})

return t, nil
}

0 comments on commit 2a6fe64

Please sign in to comment.