From bb50a2af2537c5cc5d48342504003538f3550d0b Mon Sep 17 00:00:00 2001 From: caoxianfei1 Date: Sun, 10 Dec 2023 12:55:51 +0800 Subject: [PATCH] Fix(migrate): support migrate chunkserver and metaserveruccessed Signed-off-by: caoxianfei1 --- cli/command/deploy.go | 4 +- cli/command/migrate.go | 105 +++++++++------ cli/command/scale_out.go | 8 +- internal/common/common.go | 18 ++- internal/configure/pool.go | 38 ++++-- internal/errno/errno.go | 6 +- internal/playbook/factory.go | 3 + internal/task/scripts/script.go | 6 + .../task/scripts/shell/get_copyset_status.sh | 46 +++++++ .../scripts/shell/mark_server_pendding.sh | 32 +++++ internal/task/task/bs/mark_server_pendding.go | 123 ++++++++++++++++++ internal/task/task/common/broadcast.go | 58 +++++++++ internal/task/task/common/create_pool.go | 19 ++- 13 files changed, 402 insertions(+), 64 deletions(-) create mode 100644 internal/task/scripts/shell/get_copyset_status.sh create mode 100644 internal/task/scripts/shell/mark_server_pendding.sh create mode 100644 internal/task/task/bs/mark_server_pendding.go create mode 100644 internal/task/task/common/broadcast.go diff --git a/cli/command/deploy.go b/cli/command/deploy.go index a193d88f3..be38b5cdc 100644 --- a/cli/command/deploy.go +++ b/cli/command/deploy.go @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm, Name: options.poolset, Type: options.poolsetDiskType, } - diskType := options.poolsetDiskType pb := playbook.NewPlaybook(curveadm) for _, step := range steps { @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm, options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs) } else if step == CREATE_LOGICAL_POOL { options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = diskType + options[comm.KEY_POOLSET] = poolset options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs) } diff --git a/cli/command/migrate.go b/cli/command/migrate.go index 310b29cee..a025543ef 100644 --- a/cli/command/migrate.go +++ b/cli/command/migrate.go @@ -30,7 +30,9 @@ import ( "github.com/opencurve/curveadm/internal/configure/topology" "github.com/opencurve/curveadm/internal/errno" "github.com/opencurve/curveadm/internal/playbook" - tui "github.com/opencurve/curveadm/internal/tui/common" + "github.com/opencurve/curveadm/internal/task/task/common" + tuicomm "github.com/opencurve/curveadm/internal/tui/common" + cliutil "github.com/opencurve/curveadm/internal/utils" "github.com/spf13/cobra" ) @@ -71,26 +73,23 @@ var ( // chunkserevr (curvebs) MIGRATE_CHUNKSERVER_STEPS = []int{ playbook.BACKUP_ETCD_DATA, - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container + playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, - playbook.CREATE_PHYSICAL_POOL, playbook.START_CHUNKSERVER, - playbook.CREATE_LOGICAL_POOL, + playbook.MARK_SERVER_PENGDDING, // start migrate to new server } // metaserver (curvefs) MIGRATE_METASERVER_STEPS = []int{ playbook.BACKUP_ETCD_DATA, - playbook.STOP_SERVICE, // only container - playbook.CLEAN_SERVICE, + playbook.CREATE_LOGICAL_POOL, playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, playbook.START_METASERVER, - playbook.CREATE_LOGICAL_POOL, + playbook.STOP_SERVICE, // start migrate to new server } MIGRATE_ROLE_STEPS = map[string][]int{ @@ -100,12 +99,21 @@ var ( topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS, topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS, } + + MIGRATE_POST_CLEAN_STEPS = []int{ + playbook.STOP_SERVICE, // bs + playbook.CLEAN_SERVICE, // bs, fs + playbook.CREATE_PHYSICAL_POOL, // only for chunkserver, remove server that migrate from + playbook.CREATE_LOGICAL_POOL, // only for metaserver, remove server that migrate from + playbook.UPDATE_TOPOLOGY, // bs, fs + } ) type migrateOptions struct { filename string poolset string poolsetDiskType string + clean bool } func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command { @@ -125,7 +133,7 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command { flags := cmd.Flags() flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset") flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool") - + flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver") return cmd } @@ -191,8 +199,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, migrates := getMigrates(curveadm, data) role := migrates[0].From.GetRole() steps := MIGRATE_ROLE_STEPS[role] - poolset := options.poolset - poolsetDiskType := options.poolsetDiskType + + // post clean + if options.clean { + steps = MIGRATE_POST_CLEAN_STEPS + if migrates[0].From.GetKind() == common.KIND_CURVEBS { + steps = append(steps[:3], steps[4:]...) + } else { + steps = append(steps[1:2], steps[3:]...) + } + } + + poolset := configure.Poolset{ + Name: options.poolset, + Type: options.poolsetDiskType, + } pb := playbook.NewPlaybook(curveadm) for _, step := range steps { @@ -204,38 +225,40 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, config = dcs2del case playbook.BACKUP_ETCD_DATA: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) - case CREATE_PHYSICAL_POOL, - CREATE_LOGICAL_POOL: + case + playbook.CREATE_PHYSICAL_POOL, + playbook.CREATE_LOGICAL_POOL, + playbook.MARK_SERVER_PENGDDING: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1] } // options - options := map[string]interface{}{} + optionsKV := map[string]interface{}{} switch step { case playbook.CLEAN_SERVICE: - options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER} - options[comm.KEY_CLEAN_BY_RECYCLE] = true + optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER} + optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true + optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true case playbook.CREATE_PHYSICAL_POOL: - options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL - options[comm.KEY_MIGRATE_SERVERS] = migrates - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = poolsetDiskType + optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_POOLSET] = poolset case playbook.CREATE_LOGICAL_POOL: - options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL - options[comm.KEY_MIGRATE_SERVERS] = migrates - options[comm.KEY_NEW_TOPOLOGY_DATA] = data - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = poolsetDiskType + optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data + optionsKV[comm.KEY_IF_UPDATE_TOPOLOG] = false + optionsKV[comm.KEY_POOLSET] = poolset case playbook.UPDATE_TOPOLOGY: - options[comm.KEY_NEW_TOPOLOGY_DATA] = data + optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data } pb.AddStep(&playbook.PlaybookStep{ - Type: step, - Configs: config, - Options: options, + Type: step, + Configs: config, + Options: optionsKV, ExecOptions: playbook.ExecOptions{ - SilentSubBar: step == playbook.UPDATE_TOPOLOGY, + // SilentSubBar: step == playbook.UPDATE_TOPOLOGY, }, }) } @@ -261,7 +284,10 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { } // 2) read topology from file - data, err := readTopology(curveadm, options.filename) + data, err := readTopology(curveadm, + options.filename, + options.clean, + ) if err != nil { return err } @@ -272,13 +298,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { return err } - // 4) display title - displayMigrateTitle(curveadm, data) + if !options.clean { + // 4) display title + displayMigrateTitle(curveadm, data) - // 5) confirm by user - if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass { - curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service")) - return errno.ERR_CANCEL_OPERATION + // 5) confirm by user + if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass { + curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service")) + return errno.ERR_CANCEL_OPERATION + } } // 6) generate migrate playbook @@ -294,6 +322,9 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { } // 9) print success prompt + if options.clean { + return nil + } curveadm.WriteOutln("") curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^.")) // TODO(P1): warning iff there is changed configs diff --git a/cli/command/scale_out.go b/cli/command/scale_out.go index 5e703b056..cde7f8a88 100644 --- a/cli/command/scale_out.go +++ b/cli/command/scale_out.go @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command { return cmd } -func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) { +func readTopology(curveadm *cli.CurveAdm, filename string, clean bool) (string, error) { if !utils.PathExist(filename) { return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND. F("%s: no such file", utils.AbsPath(filename)) @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) { } oldData := curveadm.ClusterTopologyData() - curveadm.WriteOut("%s", utils.Diff(oldData, data)) + if !clean { + curveadm.WriteOut("%s", utils.Diff(oldData, data)) + } return data, nil } @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error { } // 2) read topology from file - data, err := readTopology(curveadm, options.filename) + data, err := readTopology(curveadm, options.filename, false) if err != nil { return err } diff --git a/internal/common/common.go b/internal/common/common.go index 8e67c6485..5e703d9e8 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -53,6 +53,10 @@ const ( // format KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS" + // migrate + KEY_MIGRATE_STATUS = "MIGRATE_STATUS" + KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS" + // check KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK" KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME" @@ -63,6 +67,7 @@ const ( KEY_SCALE_OUT_CLUSTER = "SCALE_OUT_CLUSTER" KEY_MIGRATE_SERVERS = "MIGRATE_SERVERS" KEY_NEW_TOPOLOGY_DATA = "NEW_TOPOLOGY_DATA" + KEY_IF_UPDATE_TOPOLOG = "IF_UPDATE_TOPOTLOY" // status KEY_ALL_SERVICE_STATUS = "ALL_SERVICE_STATUS" @@ -71,12 +76,13 @@ const ( SERVICE_STATUS_UNKNOWN = "Unknown" // clean - KEY_CLEAN_ITEMS = "CLEAN_ITEMS" - KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE" - CLEAN_ITEM_LOG = "log" - CLEAN_ITEM_DATA = "data" - CLEAN_ITEM_CONTAINER = "container" - CLEANED_CONTAINER_ID = "-" + KEY_CLEAN_ITEMS = "CLEAN_ITEMS" + KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE" + CLEAN_ITEM_LOG = "log" + CLEAN_ITEM_DATA = "data" + CLEAN_ITEM_CONTAINER = "container" + CLEANED_CONTAINER_ID = "-" + KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER" // client KEY_CLIENT_HOST = "CLIENT_HOST" diff --git a/internal/configure/pool.go b/internal/configure/pool.go index b38274ba8..986ab4e95 100644 --- a/internal/configure/pool.go +++ b/internal/configure/pool.go @@ -263,26 +263,40 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po old.NPools = old.NPools + 1 } -func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) { +func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) { m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig for _, migrate := range migrates { m[formatName(migrate.From)] = migrate.To } - for i, server := range old.Servers { - dc, ok := m[server.Name] - if !ok { - continue + // add server that will migrate to + for fromName, toDc := range m { + server := Server{} + server.InternalIp = toDc.GetListenIp() + server.ExternalIp = toDc.GetListenExternalIp() + server.InternalPort = toDc.GetListenPort() + server.ExternalPort = toDc.GetListenExternalPort() + server.Name = formatName(toDc) + + for _, oldServer := range old.Servers { + if oldServer.Name == fromName { + server.PhysicalPool = oldServer.PhysicalPool + server.Poolset = oldServer.Poolset + server.Pool = oldServer.Pool + server.Zone = oldServer.Zone + } } + old.Servers = append(old.Servers, server) + } - server.InternalIp = dc.GetListenIp() - server.ExternalIp = dc.GetListenExternalIp() - server.Name = formatName(dc) - if server.InternalPort != 0 && server.ExternalPort != 0 { - server.InternalPort = dc.GetListenPort() - server.ExternalPort = dc.GetListenExternalPort() + // remove server that has migrated + if removeMigratedServer { + for i := 0; i < len(old.Servers); i++ { + _, ok := m[old.Servers[i].Name] + if ok { + old.Servers = append(old.Servers[:i], old.Servers[i+1:]...) + } } - old.Servers[i] = server } } diff --git a/internal/errno/errno.go b/internal/errno/errno.go index 46b8228c5..e421dbed6 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -398,7 +398,11 @@ var ( ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed") ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found") ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed") - + ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate") + RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr") + ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset") + ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2") + ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed") // 420: common (curvebs client) ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped") ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed") diff --git a/internal/playbook/factory.go b/internal/playbook/factory.go index f62a7b3e5..4dc3bf437 100644 --- a/internal/playbook/factory.go +++ b/internal/playbook/factory.go @@ -93,6 +93,7 @@ const ( CREATE_VOLUME MAP_IMAGE UNMAP_IMAGE + MARK_SERVER_PENGDDING // monitor PULL_MONITOR_IMAGE @@ -275,6 +276,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = bs.NewDeleteTargetTask(curveadm, nil) case LIST_TARGETS: t, err = bs.NewListTargetsTask(curveadm, nil) + case MARK_SERVER_PENGDDING: + t, err = bs.NewMarkServerPendding(curveadm, config.GetDC(i)) // fs case CHECK_CLIENT_S3: t, err = checker.NewClientS3ConfigureTask(curveadm, config.GetCC(i)) diff --git a/internal/task/scripts/script.go b/internal/task/scripts/script.go index 51bcc9813..3edd5f061 100644 --- a/internal/task/scripts/script.go +++ b/internal/task/scripts/script.go @@ -61,4 +61,10 @@ var ( //go:embed shell/create_fs.sh CREATE_FS string + + //go:embed shell/mark_server_pendding.sh + MARK_SERVER_PENDDING string + + //go:embed shell/get_copyset_status.sh + GET_COPYSET_STATUS string ) diff --git a/internal/task/scripts/shell/get_copyset_status.sh b/internal/task/scripts/shell/get_copyset_status.sh new file mode 100644 index 000000000..6ea89dca1 --- /dev/null +++ b/internal/task/scripts/shell/get_copyset_status.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash + +# Usage: create_volume USER VOLUME SIZE +# Example: create_volume curve test 10 +# Created Date: 2023-12-04 +# Author: Caoxianfei(caoxianfei1) + +role=$1 +clusterMdsAddr=$2 +fromChunkserverMdsAddr=$3 +toChunkserverMdsAddr=$4 + +if [ role == "curvebs" ]; then + from_total_copyset=$(curve_ops_tool check-chunkserver -mdsAddr="$clusterMdsAddr" -chunkserverAddr="$fromChunkserverMdsAddr" | awk -v ip="$clusterMdsAddr" -v port="$fromChunkserverMdsAddr" ' + BEGIN { FS = ", "; OFS = "\n" } + $0 ~ /total/ { + split($1, arr1, ": ") + print arr1[2] + exit + }') + + if [ -z $"from_total_copyset" ]; then + echo "get $fromChunkserverMdsAddr copyset info failed" + exit1 + fi + + to_total_copyset=$(curve_ops_tool check-chunkserver -mdsAddr="$clusterMdsAddr" -chunkserverAddr="$toChunkserverMdsAddr" | awk -v ip="$clusterMdsAddr" -v port="$toChunkserverMdsAddr" ' + BEGIN { FS = ", "; OFS = "\n" } + $0 ~ /total/ { + split($1, arr1, ": ") + print arr1[2] + exit + }') + + if [ -z $"from_total_copyset" ]; then + echo "get $toChunkserverMdsAddr copyset info failed" + exit1 + fi + + echo "$from_total_copyset":"$to_total_copyset" + + exit 0 +fi + + + diff --git a/internal/task/scripts/shell/mark_server_pendding.sh b/internal/task/scripts/shell/mark_server_pendding.sh new file mode 100644 index 000000000..9f1460077 --- /dev/null +++ b/internal/task/scripts/shell/mark_server_pendding.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# Usage: create_volume USER VOLUME SIZE +# Example: create_volume curve test 10 +# Created Date: 2023-12-04 +# Author: Caoxianfei(caoxianfei1) + +IP=$1 +PORT=$2 + +CHUNKSERVER_ID=$(curve_ops_tool chunkserver-list | awk -v ip="$IP" -v port="$PORT" ' +BEGIN { FS = ", "; OFS = "\n" } +$0 ~ /chunkServerID/ { + split($3, arr1, " = ") + split($4, arr2, " = ") + if (arr1[2] == ip && arr2[2] == port) { + split($1, arr3, " = ") + print arr3[2] + exit + } +}') + +if [ -z "$CHUNKSERVER_ID" ]; then + echo "chunkserver $IP:$PORT not found" + exit 1 +fi + +/curvebs/tools/sbin/curvebs-tool -op=set_chunkserver -chunkserver_id=$CHUNKSERVER_ID -chunkserver_status=pendding +if [ $? -ne 0 ]; then + echo "failed to set chunkserver $IP:$PORT pendding status" + exit 1 +fi \ No newline at end of file diff --git a/internal/task/task/bs/mark_server_pendding.go b/internal/task/task/bs/mark_server_pendding.go new file mode 100644 index 000000000..06b350bec --- /dev/null +++ b/internal/task/task/bs/mark_server_pendding.go @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-11-30 + * Author: Xianfei Cao (caoxianfei1) + */ + +package bs + +import ( + "fmt" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func CheckContainerExist(host, role, containerId string, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if len(*out) == 0 { + return errno.ERR_CONTAINER_ALREADT_REMOVED. + F("host=%s role=%s containerId=%s", + host, role, tui.TrimContainerId(containerId)) + } + return nil + } +} + +func checkMarkStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_MARK_CHUNKSERVER_PENDDING.S(*out) + } + return nil + } +} + +func NewMarkServerPendding(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + // new task + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Mark Chunkserver Pendding", subname, hc.GetSSHConfig()) + + var out string + var success bool + host, role := dc.GetHost(), dc.GetRole() + layout := dc.GetProjectLayout() + markCSPenddingScript := scripts.MARK_SERVER_PENDDING + scriptPath := layout.ToolsBinDir + "/mark_server_pendding.sh" + + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ // install /curvebs/tools/sbin/mark_chunkserver_pendding.sh + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &markCSPenddingScript, + ExecOptions: curveadm.ExecOptions(), + }) + for _, migrate := range migrates { + hostip := migrate.From.GetListenIp() + hostport := migrate.From.GetListenPort() + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Command: fmt.Sprintf("/bin/bash %s %s %d", scriptPath, hostip, hostport), + Success: &success, + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkMarkStatus(&success, &out), + }) + } + + return t, nil +} diff --git a/internal/task/task/common/broadcast.go b/internal/task/task/common/broadcast.go new file mode 100644 index 000000000..5e6220d60 --- /dev/null +++ b/internal/task/task/common/broadcast.go @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-15 + * Author: Caoxianfei (caoxianfei1) + */ + +package common + +import ( + "fmt" + + "github.com/opencurve/curveadm/cli/cli" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func NewBroadCastMdsAddrsTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig()) + + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Command: genBackupCommand(dc), + ExecOptions: curveadm.ExecOptions(), + }) + return t, nil +} diff --git a/internal/task/task/common/create_pool.go b/internal/task/task/common/create_pool.go index 3adbbc200..e784d8e10 100644 --- a/internal/task/task/common/create_pool.go +++ b/internal/task/task/common/create_pool.go @@ -110,7 +110,14 @@ func prepare(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (clusterPoolJson configure.ScaleOutClusterPool(&clusterPool, dcs, poolset) } else if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { // migrate servers migrates := curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) - configure.MigrateClusterServer(&clusterPool, migrates) + var removeMigratedServer bool + if curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER) == nil { + removeMigratedServer = false + } else if curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER) != nil && + curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER).(bool) == true { + removeMigratedServer = true + } + configure.MigrateClusterServer(&clusterPool, migrates, removeMigratedServer) } // 3. encode cluster pool to json string @@ -217,6 +224,14 @@ func NewCreateTopologyTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* build.DEBUG(build.DEBUG_CREATE_POOL, build.Field{"pool json", clusterPoolJson}) + var setClusterPool bool + value := curveadm.MemStorage().Get(comm.KEY_IF_UPDATE_TOPOLOG) + if value != nil { + setClusterPool = false + } else { + setClusterPool = true + } + t.AddStep(&step.ListContainers{ ShowAll: true, Format: `"{{.ID}}"`, @@ -281,7 +296,7 @@ func NewCreateTopologyTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* t.AddStep(&step.Lambda{ Lambda: checkCreatePoolStatus(&success, &out), }) - if pooltype == comm.POOL_TYPE_LOGICAL { + if pooltype == comm.POOL_TYPE_LOGICAL && setClusterPool { t.AddStep(&step2SetClusterPool{ curveadm: curveadm, clusterPool: clusterPoolJson,