From b1a3a4edacc264ed5a221c57660ec722b64a5e88 Mon Sep 17 00:00:00 2001 From: caoxianfei1 Date: Sun, 10 Dec 2023 12:55:51 +0800 Subject: [PATCH 1/2] Fix(migrate): support migrate chunkserver and metaserver successed Signed-off-by: caoxianfei1 --- cli/command/deploy.go | 4 +- cli/command/migrate.go | 105 +++++++++------ cli/command/scale_out.go | 8 +- internal/common/common.go | 18 ++- internal/configure/pool.go | 38 ++++-- internal/errno/errno.go | 6 +- internal/playbook/factory.go | 3 + internal/task/scripts/script.go | 3 + .../scripts/shell/mark_server_pendding.sh | 32 +++++ internal/task/task/bs/mark_server_pendding.go | 123 ++++++++++++++++++ internal/task/task/common/create_pool.go | 17 ++- 11 files changed, 293 insertions(+), 64 deletions(-) create mode 100644 internal/task/scripts/shell/mark_server_pendding.sh create mode 100644 internal/task/task/bs/mark_server_pendding.go diff --git a/cli/command/deploy.go b/cli/command/deploy.go index a193d88f3..be38b5cdc 100644 --- a/cli/command/deploy.go +++ b/cli/command/deploy.go @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm, Name: options.poolset, Type: options.poolsetDiskType, } - diskType := options.poolsetDiskType pb := playbook.NewPlaybook(curveadm) for _, step := range steps { @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm, options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs) } else if step == CREATE_LOGICAL_POOL { options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = diskType + options[comm.KEY_POOLSET] = poolset options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs) } diff --git a/cli/command/migrate.go b/cli/command/migrate.go index 310b29cee..a025543ef 100644 --- a/cli/command/migrate.go +++ b/cli/command/migrate.go @@ -30,7 +30,9 @@ import ( "github.com/opencurve/curveadm/internal/configure/topology" "github.com/opencurve/curveadm/internal/errno" "github.com/opencurve/curveadm/internal/playbook" - tui "github.com/opencurve/curveadm/internal/tui/common" + "github.com/opencurve/curveadm/internal/task/task/common" + tuicomm "github.com/opencurve/curveadm/internal/tui/common" + cliutil "github.com/opencurve/curveadm/internal/utils" "github.com/spf13/cobra" ) @@ -71,26 +73,23 @@ var ( // chunkserevr (curvebs) MIGRATE_CHUNKSERVER_STEPS = []int{ playbook.BACKUP_ETCD_DATA, - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container + playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, - playbook.CREATE_PHYSICAL_POOL, playbook.START_CHUNKSERVER, - playbook.CREATE_LOGICAL_POOL, + playbook.MARK_SERVER_PENGDDING, // start migrate to new server } // metaserver (curvefs) MIGRATE_METASERVER_STEPS = []int{ playbook.BACKUP_ETCD_DATA, - playbook.STOP_SERVICE, // only container - playbook.CLEAN_SERVICE, + playbook.CREATE_LOGICAL_POOL, playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, playbook.START_METASERVER, - playbook.CREATE_LOGICAL_POOL, + playbook.STOP_SERVICE, // start migrate to new server } MIGRATE_ROLE_STEPS = map[string][]int{ @@ -100,12 +99,21 @@ var ( topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS, topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS, } + + MIGRATE_POST_CLEAN_STEPS = []int{ + playbook.STOP_SERVICE, // bs + playbook.CLEAN_SERVICE, // bs, fs + playbook.CREATE_PHYSICAL_POOL, // only for chunkserver, remove server that migrate from + playbook.CREATE_LOGICAL_POOL, // only for metaserver, remove server that migrate from + playbook.UPDATE_TOPOLOGY, // bs, fs + } ) type migrateOptions struct { filename string poolset string poolsetDiskType string + clean bool } func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command { @@ -125,7 +133,7 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command { flags := cmd.Flags() flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset") flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool") - + flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver") return cmd } @@ -191,8 +199,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, migrates := getMigrates(curveadm, data) role := migrates[0].From.GetRole() steps := MIGRATE_ROLE_STEPS[role] - poolset := options.poolset - poolsetDiskType := options.poolsetDiskType + + // post clean + if options.clean { + steps = MIGRATE_POST_CLEAN_STEPS + if migrates[0].From.GetKind() == common.KIND_CURVEBS { + steps = append(steps[:3], steps[4:]...) + } else { + steps = append(steps[1:2], steps[3:]...) + } + } + + poolset := configure.Poolset{ + Name: options.poolset, + Type: options.poolsetDiskType, + } pb := playbook.NewPlaybook(curveadm) for _, step := range steps { @@ -204,38 +225,40 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, config = dcs2del case playbook.BACKUP_ETCD_DATA: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) - case CREATE_PHYSICAL_POOL, - CREATE_LOGICAL_POOL: + case + playbook.CREATE_PHYSICAL_POOL, + playbook.CREATE_LOGICAL_POOL, + playbook.MARK_SERVER_PENGDDING: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1] } // options - options := map[string]interface{}{} + optionsKV := map[string]interface{}{} switch step { case playbook.CLEAN_SERVICE: - options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER} - options[comm.KEY_CLEAN_BY_RECYCLE] = true + optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER} + optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true + optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true case playbook.CREATE_PHYSICAL_POOL: - options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL - options[comm.KEY_MIGRATE_SERVERS] = migrates - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = poolsetDiskType + optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_POOLSET] = poolset case playbook.CREATE_LOGICAL_POOL: - options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL - options[comm.KEY_MIGRATE_SERVERS] = migrates - options[comm.KEY_NEW_TOPOLOGY_DATA] = data - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = poolsetDiskType + optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data + optionsKV[comm.KEY_IF_UPDATE_TOPOLOG] = false + optionsKV[comm.KEY_POOLSET] = poolset case playbook.UPDATE_TOPOLOGY: - options[comm.KEY_NEW_TOPOLOGY_DATA] = data + optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data } pb.AddStep(&playbook.PlaybookStep{ - Type: step, - Configs: config, - Options: options, + Type: step, + Configs: config, + Options: optionsKV, ExecOptions: playbook.ExecOptions{ - SilentSubBar: step == playbook.UPDATE_TOPOLOGY, + // SilentSubBar: step == playbook.UPDATE_TOPOLOGY, }, }) } @@ -261,7 +284,10 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { } // 2) read topology from file - data, err := readTopology(curveadm, options.filename) + data, err := readTopology(curveadm, + options.filename, + options.clean, + ) if err != nil { return err } @@ -272,13 +298,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { return err } - // 4) display title - displayMigrateTitle(curveadm, data) + if !options.clean { + // 4) display title + displayMigrateTitle(curveadm, data) - // 5) confirm by user - if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass { - curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service")) - return errno.ERR_CANCEL_OPERATION + // 5) confirm by user + if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass { + curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service")) + return errno.ERR_CANCEL_OPERATION + } } // 6) generate migrate playbook @@ -294,6 +322,9 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { } // 9) print success prompt + if options.clean { + return nil + } curveadm.WriteOutln("") curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^.")) // TODO(P1): warning iff there is changed configs diff --git a/cli/command/scale_out.go b/cli/command/scale_out.go index 5e703b056..cde7f8a88 100644 --- a/cli/command/scale_out.go +++ b/cli/command/scale_out.go @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command { return cmd } -func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) { +func readTopology(curveadm *cli.CurveAdm, filename string, clean bool) (string, error) { if !utils.PathExist(filename) { return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND. F("%s: no such file", utils.AbsPath(filename)) @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) { } oldData := curveadm.ClusterTopologyData() - curveadm.WriteOut("%s", utils.Diff(oldData, data)) + if !clean { + curveadm.WriteOut("%s", utils.Diff(oldData, data)) + } return data, nil } @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error { } // 2) read topology from file - data, err := readTopology(curveadm, options.filename) + data, err := readTopology(curveadm, options.filename, false) if err != nil { return err } diff --git a/internal/common/common.go b/internal/common/common.go index ff726c663..0af1bb19e 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -54,6 +54,10 @@ const ( // format KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS" + // migrate + KEY_MIGRATE_STATUS = "MIGRATE_STATUS" + KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS" + // check KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK" KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME" @@ -64,6 +68,7 @@ const ( KEY_SCALE_OUT_CLUSTER = "SCALE_OUT_CLUSTER" KEY_MIGRATE_SERVERS = "MIGRATE_SERVERS" KEY_NEW_TOPOLOGY_DATA = "NEW_TOPOLOGY_DATA" + KEY_IF_UPDATE_TOPOLOG = "IF_UPDATE_TOPOTLOY" // status KEY_ALL_SERVICE_STATUS = "ALL_SERVICE_STATUS" @@ -72,12 +77,13 @@ const ( SERVICE_STATUS_UNKNOWN = "Unknown" // clean - KEY_CLEAN_ITEMS = "CLEAN_ITEMS" - KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE" - CLEAN_ITEM_LOG = "log" - CLEAN_ITEM_DATA = "data" - CLEAN_ITEM_CONTAINER = "container" - CLEANED_CONTAINER_ID = "-" + KEY_CLEAN_ITEMS = "CLEAN_ITEMS" + KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE" + CLEAN_ITEM_LOG = "log" + CLEAN_ITEM_DATA = "data" + CLEAN_ITEM_CONTAINER = "container" + CLEANED_CONTAINER_ID = "-" + KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER" // client KEY_CLIENT_HOST = "CLIENT_HOST" diff --git a/internal/configure/pool.go b/internal/configure/pool.go index b38274ba8..986ab4e95 100644 --- a/internal/configure/pool.go +++ b/internal/configure/pool.go @@ -263,26 +263,40 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po old.NPools = old.NPools + 1 } -func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) { +func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) { m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig for _, migrate := range migrates { m[formatName(migrate.From)] = migrate.To } - for i, server := range old.Servers { - dc, ok := m[server.Name] - if !ok { - continue + // add server that will migrate to + for fromName, toDc := range m { + server := Server{} + server.InternalIp = toDc.GetListenIp() + server.ExternalIp = toDc.GetListenExternalIp() + server.InternalPort = toDc.GetListenPort() + server.ExternalPort = toDc.GetListenExternalPort() + server.Name = formatName(toDc) + + for _, oldServer := range old.Servers { + if oldServer.Name == fromName { + server.PhysicalPool = oldServer.PhysicalPool + server.Poolset = oldServer.Poolset + server.Pool = oldServer.Pool + server.Zone = oldServer.Zone + } } + old.Servers = append(old.Servers, server) + } - server.InternalIp = dc.GetListenIp() - server.ExternalIp = dc.GetListenExternalIp() - server.Name = formatName(dc) - if server.InternalPort != 0 && server.ExternalPort != 0 { - server.InternalPort = dc.GetListenPort() - server.ExternalPort = dc.GetListenExternalPort() + // remove server that has migrated + if removeMigratedServer { + for i := 0; i < len(old.Servers); i++ { + _, ok := m[old.Servers[i].Name] + if ok { + old.Servers = append(old.Servers[:i], old.Servers[i+1:]...) + } } - old.Servers[i] = server } } diff --git a/internal/errno/errno.go b/internal/errno/errno.go index 2b6ec0ca7..117460bdf 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -399,7 +399,11 @@ var ( ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed") ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found") ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed") - + ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate") + RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr") + ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset") + ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2") + ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed") // 420: common (curvebs client) ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped") ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed") diff --git a/internal/playbook/factory.go b/internal/playbook/factory.go index b4a52e4f5..b2c699fda 100644 --- a/internal/playbook/factory.go +++ b/internal/playbook/factory.go @@ -94,6 +94,7 @@ const ( CREATE_VOLUME MAP_IMAGE UNMAP_IMAGE + MARK_SERVER_PENGDDING // monitor PULL_MONITOR_IMAGE @@ -278,6 +279,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = bs.NewDeleteTargetTask(curveadm, nil) case LIST_TARGETS: t, err = bs.NewListTargetsTask(curveadm, nil) + case MARK_SERVER_PENGDDING: + t, err = bs.NewMarkServerPendding(curveadm, config.GetDC(i)) // fs case CHECK_CLIENT_S3: t, err = checker.NewClientS3ConfigureTask(curveadm, config.GetCC(i)) diff --git a/internal/task/scripts/script.go b/internal/task/scripts/script.go index 51bcc9813..e310d42e6 100644 --- a/internal/task/scripts/script.go +++ b/internal/task/scripts/script.go @@ -61,4 +61,7 @@ var ( //go:embed shell/create_fs.sh CREATE_FS string + + //go:embed shell/mark_server_pendding.sh + MARK_SERVER_PENDDING string ) diff --git a/internal/task/scripts/shell/mark_server_pendding.sh b/internal/task/scripts/shell/mark_server_pendding.sh new file mode 100644 index 000000000..468406bac --- /dev/null +++ b/internal/task/scripts/shell/mark_server_pendding.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# Usage: bash mark_server_pendding.sh CHUNKIP CHUNKPORT +# Example: bash mark_server_pendding.sh 127.0.0.1 18200 +# Created Date: 2023-12-04 +# Author: Caoxianfei(caoxianfei1) + +IP=$1 +PORT=$2 + +CHUNKSERVER_ID=$(curve_ops_tool chunkserver-list | awk -v ip="$IP" -v port="$PORT" ' +BEGIN { FS = ", "; OFS = "\n" } +$0 ~ /chunkServerID/ { + split($3, arr1, " = ") + split($4, arr2, " = ") + if (arr1[2] == ip && arr2[2] == port) { + split($1, arr3, " = ") + print arr3[2] + exit + } +}') + +if [ -z "$CHUNKSERVER_ID" ]; then + echo "chunkserver $IP:$PORT not found" + exit 1 +fi + +/curvebs/tools/sbin/curvebs-tool -op=set_chunkserver -chunkserver_id=$CHUNKSERVER_ID -chunkserver_status=pendding +if [ $? -ne 0 ]; then + echo "failed to set chunkserver $IP:$PORT pendding status" + exit 1 +fi \ No newline at end of file diff --git a/internal/task/task/bs/mark_server_pendding.go b/internal/task/task/bs/mark_server_pendding.go new file mode 100644 index 000000000..6cd4417b4 --- /dev/null +++ b/internal/task/task/bs/mark_server_pendding.go @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-11-30 + * Author: Xianfei Cao (caoxianfei1) + */ + +package bs + +import ( + "fmt" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func CheckContainerExist(host, role, containerId string, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if len(*out) == 0 { + return errno.ERR_CONTAINER_ALREADT_REMOVED. + F("host=%s role=%s containerId=%s", + host, role, tui.TrimContainerId(containerId)) + } + return nil + } +} + +func checkMarkStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_MARK_CHUNKSERVER_PENDDING.S(*out) + } + return nil + } +} + +func NewMarkServerPendding(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + // new task + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Mark Chunkserver Pendding", subname, hc.GetSSHConfig()) + + var out string + var success bool + host, role := dc.GetHost(), dc.GetRole() + layout := dc.GetProjectLayout() + markCSPenddingScript := scripts.MARK_SERVER_PENDDING + scriptPath := layout.ToolsBinDir + "/mark_server_pendding.sh" + + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ // install /curvebs/tools/sbin/mark_chunkserver_pendding.sh + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &markCSPenddingScript, + ExecOptions: curveadm.ExecOptions(), + }) + for _, migrate := range migrates { + hostip := migrate.From.GetListenIp() + hostport := migrate.From.GetListenPort() + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Command: fmt.Sprintf("/bin/bash %s %s %d", scriptPath, hostip, hostport), + Success: &success, + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkMarkStatus(&success, &out), + }) + } + + return t, nil +} diff --git a/internal/task/task/common/create_pool.go b/internal/task/task/common/create_pool.go index 3adbbc200..99d41fd9f 100644 --- a/internal/task/task/common/create_pool.go +++ b/internal/task/task/common/create_pool.go @@ -110,7 +110,14 @@ func prepare(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (clusterPoolJson configure.ScaleOutClusterPool(&clusterPool, dcs, poolset) } else if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { // migrate servers migrates := curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) - configure.MigrateClusterServer(&clusterPool, migrates) + var removeMigratedServer bool + if curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER) == nil { + removeMigratedServer = false + } else if curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER) != nil && + curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER).(bool) == true { + removeMigratedServer = true + } + configure.MigrateClusterServer(&clusterPool, migrates, removeMigratedServer) } // 3. encode cluster pool to json string @@ -217,6 +224,12 @@ func NewCreateTopologyTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* build.DEBUG(build.DEBUG_CREATE_POOL, build.Field{"pool json", clusterPoolJson}) + var setClusterPool bool + value := curveadm.MemStorage().Get(comm.KEY_IF_UPDATE_TOPOLOG) + if value == nil { + setClusterPool = true + } + t.AddStep(&step.ListContainers{ ShowAll: true, Format: `"{{.ID}}"`, @@ -281,7 +294,7 @@ func NewCreateTopologyTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* t.AddStep(&step.Lambda{ Lambda: checkCreatePoolStatus(&success, &out), }) - if pooltype == comm.POOL_TYPE_LOGICAL { + if pooltype == comm.POOL_TYPE_LOGICAL && setClusterPool { t.AddStep(&step2SetClusterPool{ curveadm: curveadm, clusterPool: clusterPoolJson, From aace2bbdf88bd97b7608c18c8c54a9f34f9aca14 Mon Sep 17 00:00:00 2001 From: caoxianfei1 Date: Tue, 2 Jan 2024 13:57:31 +0800 Subject: [PATCH 2/2] Fix(migrate): support migrate other service --- cli/command/migrate.go | 42 ++++- internal/common/common.go | 1 + internal/configure/topology/dc_get.go | 6 +- internal/configure/topology/variables.go | 4 +- internal/errno/errno.go | 3 + internal/playbook/factory.go | 12 ++ internal/task/scripts/enable_etcd_auth.go | 4 +- internal/task/scripts/script.go | 4 + internal/task/scripts/shell/add_etcd.sh | 47 ++++++ internal/task/scripts/shell/remove_etcd.sh | 33 ++++ internal/task/task/common/add_etcd_mem.go | 119 +++++++++++++ .../task/task/common/amend_etcd_config.go | 119 +++++++++++++ .../task/task/common/amend_server_config.go | 157 ++++++++++++++++++ internal/task/task/common/remove_etcd_mem.go | 110 ++++++++++++ 14 files changed, 650 insertions(+), 11 deletions(-) create mode 100644 internal/task/scripts/shell/add_etcd.sh create mode 100644 internal/task/scripts/shell/remove_etcd.sh create mode 100644 internal/task/task/common/add_etcd_mem.go create mode 100644 internal/task/task/common/amend_etcd_config.go create mode 100644 internal/task/task/common/amend_server_config.go create mode 100644 internal/task/task/common/remove_etcd_mem.go diff --git a/cli/command/migrate.go b/cli/command/migrate.go index a025543ef..f2abdfd9c 100644 --- a/cli/command/migrate.go +++ b/cli/command/migrate.go @@ -39,23 +39,30 @@ import ( var ( MIGRATE_ETCD_STEPS = []int{ - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container + playbook.ADD_ETCD_MEMBER, playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, + playbook.AMEND_ETCD_CONFIG, playbook.START_ETCD, + playbook.REMOVE_ETCD_MEMBER, + playbook.AMEND_SERVER_CONFIG, // modify the etcd endpoint in mds.conf + playbook.RESTART_SERVICE, // restart all mds then modify the etcd endpoint + playbook.STOP_SERVICE, + playbook.CLEAN_SERVICE, // only container playbook.UPDATE_TOPOLOGY, } // mds MIGRATE_MDS_STEPS = []int{ - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, playbook.START_MDS, + playbook.AMEND_SERVER_CONFIG, // modify the mds.listen.addr in metaserver.conf + playbook.RESTART_SERVICE, // restart all metaserver then modify the mds.listen.addr + playbook.STOP_SERVICE, + playbook.CLEAN_SERVICE, // only container playbook.UPDATE_TOPOLOGY, } @@ -67,6 +74,8 @@ var ( playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, playbook.START_SNAPSHOTCLONE, + playbook.AMEND_SERVER_CONFIG, // modify the mds.listen.addr in metaserver.conf + playbook.RESTART_SERVICE, // restart all metaserver then modify the mds.listen.addr playbook.UPDATE_TOPOLOGY, } @@ -157,7 +166,7 @@ func checkMigrateTopology(curveadm *cli.CurveAdm, data string) error { } else if len(dcs2add) < len(dcs2del) { return errno.ERR_DELETE_SERVICE_WHILE_MIGRATING_IS_DENIED } - // len(dcs2add) == len(dcs2del) + if len(dcs2add) == 0 { return errno.ERR_NO_SERVICES_FOR_MIGRATING } @@ -199,6 +208,7 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, migrates := getMigrates(curveadm, data) role := migrates[0].From.GetRole() steps := MIGRATE_ROLE_STEPS[role] + etcdDCs := curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) // post clean if options.clean { @@ -221,10 +231,25 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, config := dcs2add switch step { case playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE: + playbook.CLEAN_SERVICE, + playbook.ADD_ETCD_MEMBER, + playbook.REMOVE_ETCD_MEMBER: config = dcs2del case playbook.BACKUP_ETCD_DATA: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) + // 1. migrate etcd, need to override mds config and restart all mds + // 2. (FS)migrate mds, need to override metaserver config and restart all metaservers + // 3. (BS)migrate mds, need to override chunkserver and snapshot config and restart all chunkservers and snapshotclones + case playbook.AMEND_SERVER_CONFIG, + playbook.RESTART_SERVICE: + if role == topology.ROLE_ETCD { + config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS) + } else if role == topology.ROLE_MDS && dcs[0].GetKind() == topology.KIND_CURVEFS { + config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_METASERVER) + } else if role == topology.ROLE_MDS && dcs[0].GetKind() == topology.KIND_CURVEBS { + config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_CHUNKSERVER) + config = append(config, curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_SNAPSHOTCLONE)...) + } case playbook.CREATE_PHYSICAL_POOL, playbook.CREATE_LOGICAL_POOL, @@ -251,6 +276,11 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, optionsKV[comm.KEY_POOLSET] = poolset case playbook.UPDATE_TOPOLOGY: optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data + case playbook.ADD_ETCD_MEMBER, + playbook.AMEND_ETCD_CONFIG, + playbook.AMEND_SERVER_CONFIG: + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_CLUSTER_DCS] = etcdDCs } pb.AddStep(&playbook.PlaybookStep{ diff --git a/internal/common/common.go b/internal/common/common.go index 0af1bb19e..ae597ee08 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -57,6 +57,7 @@ const ( // migrate KEY_MIGRATE_STATUS = "MIGRATE_STATUS" KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS" + KEY_CLUSTER_DCS = "CLUSTER_DCS" // check KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK" diff --git a/internal/configure/topology/dc_get.go b/internal/configure/topology/dc_get.go index ee8e7c2b1..c18eb0225 100644 --- a/internal/configure/topology/dc_get.go +++ b/internal/configure/topology/dc_get.go @@ -121,7 +121,11 @@ func (dc *DeployConfig) GetInstances() int { return dc.instanc func (dc *DeployConfig) GetHostSequence() int { return dc.hostSequence } func (dc *DeployConfig) GetInstancesSequence() int { return dc.instancesSequence } func (dc *DeployConfig) GetServiceConfig() map[string]string { return dc.serviceConfig } -func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables } +func (dc *DeployConfig) SetServiceConfig(key, value string) { + dc.serviceConfig[key] = value +} + +func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables } // (2): config item func (dc *DeployConfig) GetPrefix() string { return dc.getString(CONFIG_PREFIX) } diff --git a/internal/configure/topology/variables.go b/internal/configure/topology/variables.go index a2b246805..de123bc33 100644 --- a/internal/configure/topology/variables.go +++ b/internal/configure/topology/variables.go @@ -118,9 +118,9 @@ var ( {name: "cluster_mds_dummy_addr"}, {name: "cluster_mds_dummy_port"}, {name: "cluster_chunkserver_addr", kind: []string{KIND_CURVEBS}}, - {name: "cluster_snapshotclone_addr", kind: []string{KIND_CURVEBS}}, + {name: "cluster_snapshotclone_addr"}, {name: "cluster_snapshotclone_proxy_addr", kind: []string{KIND_CURVEBS}}, - {name: "cluster_snapshotclone_dummy_port", kind: []string{KIND_CURVEBS}}, + {name: "cluster_snapshotclone_dummy_port"}, {name: "cluster_snapshotclone_nginx_upstream", kind: []string{KIND_CURVEBS}}, {name: "cluster_snapshot_addr"}, // tools-v2: compatible with some old version image {name: "cluster_snapshot_dummy_addr"}, // tools-v2 diff --git a/internal/errno/errno.go b/internal/errno/errno.go index 117460bdf..8bfe176b7 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -404,6 +404,9 @@ var ( ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset") ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2") ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed") + ERR_GET_CLUSTER_ETCD_ADDR = EC(410028, "failed to get cluster_etcd_addr variable") + ERR_ADD_ETCD_MEMEBER = EC(410029, "failed to add etcd member to existing etcd cluster") + ERR_REMOVE_ETCD_MEMBER = EC(410030, "failed to remove etcd member from existing etcd cluster") // 420: common (curvebs client) ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped") ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed") diff --git a/internal/playbook/factory.go b/internal/playbook/factory.go index b2c699fda..282a59f78 100644 --- a/internal/playbook/factory.go +++ b/internal/playbook/factory.go @@ -84,6 +84,10 @@ const ( INSTALL_CLIENT UNINSTALL_CLIENT ATTACH_LEADER_OR_RANDOM_CONTAINER + ADD_ETCD_MEMBER + AMEND_ETCD_CONFIG + AMEND_SERVER_CONFIG + REMOVE_ETCD_MEMBER // bs FORMAT_CHUNKFILE_POOL @@ -251,6 +255,14 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = comm.NewInstallClientTask(curveadm, config.GetCC(i)) case UNINSTALL_CLIENT: t, err = comm.NewUninstallClientTask(curveadm, nil) + case ADD_ETCD_MEMBER: + t, err = comm.NewAddEtcdMemberTask(curveadm, config.GetDC(i)) + case AMEND_ETCD_CONFIG: + t, err = comm.NewAmendEtcdConfigTask(curveadm, config.GetDC(i)) + case AMEND_SERVER_CONFIG: + t, err = comm.NewAmendServerConfigTask(curveadm, config.GetDC(i)) + case REMOVE_ETCD_MEMBER: + t, err = comm.NewRemoveEtcdMemberTask(curveadm, config.GetDC(i)) // bs case FORMAT_CHUNKFILE_POOL: t, err = bs.NewFormatChunkfilePoolTask(curveadm, config.GetFC(i)) diff --git a/internal/task/scripts/enable_etcd_auth.go b/internal/task/scripts/enable_etcd_auth.go index a18ff157c..33e14102e 100644 --- a/internal/task/scripts/enable_etcd_auth.go +++ b/internal/task/scripts/enable_etcd_auth.go @@ -12,13 +12,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ /* * Project: Curveadm * Created Date: 2023-08-02 * Author: wanghai (SeanHai) -*/ + */ package scripts diff --git a/internal/task/scripts/script.go b/internal/task/scripts/script.go index e310d42e6..f8cb7d86e 100644 --- a/internal/task/scripts/script.go +++ b/internal/task/scripts/script.go @@ -39,6 +39,10 @@ var ( WAIT string //go:embed shell/report.sh REPORT string + //go:embed shell/add_etcd.sh + ADD_ETCD string + //go:embed shell/remove_etcd.sh + REMOVE_ETCD string // CurveBS diff --git a/internal/task/scripts/shell/add_etcd.sh b/internal/task/scripts/shell/add_etcd.sh new file mode 100644 index 000000000..f5a129196 --- /dev/null +++ b/internal/task/scripts/shell/add_etcd.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Usage: +# Example: +# Created Date: 2023-12-15 +# Author: Caoxianfei + +etcdctl=$1 +endpoints=$2 +old_name=$3 +new_name=$4 +new_peer_url=$5 + +tmplog=/tmp/_curveadm_add_etcd_ + +output=$(${etcdctl} --endpoints=${endpoints} member list) +if [ $? -ne 0 ]; then + echo "failed to list all etcd members" + exit 1 +fi + +# if member has added, then skip +id=$(echo "$output" | awk -v name="$new_name" -F ', ' '$3 == name {print $1}') +if [ -z "${id}" ]; then + echo "EXIST" + exit 0 +fi + +${etcdctl} --endpoints=${endpoints} member add ${new_name} --peer-urls ${new_peer_url} > ${tmplog} 2>&1 +if [ $? -ne 0 ]; then + if cat ${tmplog} | grep -q "Peer URLs already exists"; then + exit 0 + else + exit 1 + fi +fi + + +# ${etcdctl} --endpoints=${endpoints} member remove ${id} +# if [ $? -ne 0 ]; then +# echo "failed to remove member ${old_name}" +# exit 1 +# fi + + + + diff --git a/internal/task/scripts/shell/remove_etcd.sh b/internal/task/scripts/shell/remove_etcd.sh new file mode 100644 index 000000000..44f1821d5 --- /dev/null +++ b/internal/task/scripts/shell/remove_etcd.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash + +# Usage: +# Example: +# Created Date: 2023-12-15 +# Author: Caoxianfei + +etcdctl=$1 +endpoints=$2 +old_name=$3 + +output=$(${etcdctl} --endpoints=${endpoints} member list) +if [ $? -ne 0 ]; then + echo "failed to list all etcd members" + exit 1 +fi + +id=$(echo "$output" | awk -v name="$old_name" -F ', ' '$3 == name {print $1}') +# if not found the name then exit 0 +if [ -z "${id}" ]; then + echo "NOTEXIST" + exit 0 +fi + +${etcdctl} --endpoints=${endpoints} member remove ${id} +if [ $? -ne 0 ]; then + echo "failed to remove member ${old_name}" + exit 1 +fi + + + + diff --git a/internal/task/task/common/add_etcd_mem.go b/internal/task/task/common/add_etcd_mem.go new file mode 100644 index 000000000..177c08b4a --- /dev/null +++ b/internal/task/task/common/add_etcd_mem.go @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func checkAddEtcdMemberStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_ADD_ETCD_MEMEBER.S(*out) + } + if *out == "EXIST" { + return task.ERR_SKIP_TASK + } + return nil + } +} + +func NewAddEtcdMemberTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Add Etcd Member", subname, hc.GetSSHConfig()) + + host, role := dc.GetHost(), dc.GetRole() + script := scripts.ADD_ETCD + layout := dc.GetProjectLayout() + scriptPath := fmt.Sprintf("%s/add_etcd.sh", layout.ServiceBinDir) + etcdctlPath := layout.ServiceBinDir + "/etcdctl" + endpoints, err := dc.GetVariables().Get("cluster_etcd_addr") + if err != nil { + return nil, errno.ERR_GET_CLUSTER_ETCD_ADDR + } + oldName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence())) + newName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()+3), strconv.Itoa(dc.GetInstancesSequence())) + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + toService := migrates[0].To + peerUrl := fmt.Sprint("http://", toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenPort())) + addEtcdCmd := fmt.Sprintf("/bin/bash %s %s %s %s %s %s", scriptPath, etcdctlPath, endpoints, oldName, newName, peerUrl) + + var success bool + var out string + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &script, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Success: &success, + Out: &out, + Command: addEtcdCmd, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkAddEtcdMemberStatus(&success, &out), + }) + + return t, nil +} diff --git a/internal/task/task/common/amend_etcd_config.go b/internal/task/task/common/amend_etcd_config.go new file mode 100644 index 000000000..cf36fa45a --- /dev/null +++ b/internal/task/task/common/amend_etcd_config.go @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + "strings" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +const ( + AMEND_NAME = "name" + AMEND_ENDPOINTS = "initial-cluster" + AMEND_STATE = "initial-cluster-state" +) + +var options = make(map[string]interface{}) + +func mutateEtcdConf(dc *topology.DeployConfig, delimiter string, forceRender bool) step.Mutate { + return func(in, key, value string) (out string, err error) { + if len(key) == 0 { + out = in + return + } + if key == AMEND_NAME { + value = options[AMEND_NAME].(string) + } else if key == AMEND_ENDPOINTS { + value = options[AMEND_ENDPOINTS].(string) + } else if key == AMEND_STATE { + value = "existing" + } + + out = fmt.Sprintf("%s%s%s", key, delimiter, value) + return + } +} + +func NewAmendEtcdConfigTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Override Etcd configure", subname, hc.GetSSHConfig()) + + layout := dc.GetProjectLayout() + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + dcs := []*topology.DeployConfig{} + if curveadm.MemStorage().Get(comm.KEY_CLUSTER_DCS) != nil { + dcs = curveadm.MemStorage().Get(comm.KEY_CLUSTER_DCS).([]*topology.DeployConfig) + } + endpoints := []string{} + for _, dc := range dcs { + ept := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence()), + "=", "http://", dc.GetListenIp(), ":", strconv.Itoa(dc.GetListenPort())) + endpoints = append(endpoints, ept) + } + toService := migrates[0].To + newName := fmt.Sprint("etcd", strconv.Itoa(toService.GetHostSequence()+3), strconv.Itoa(toService.GetInstancesSequence())) + toSeriveEndpint := fmt.Sprint(newName, "=", "http://", + toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenPort())) + endpoints = append(endpoints, toSeriveEndpint) + endpointsStr := strings.Join(endpoints, ",") + + options[AMEND_NAME] = newName + options[AMEND_ENDPOINTS] = endpointsStr + + t.AddStep(&step.SyncFile{ // sync etcd.conf config + ContainerSrcId: &containerId, + ContainerSrcPath: layout.ServiceConfPath, + ContainerDestId: &containerId, + ContainerDestPath: layout.ServiceConfPath, + KVFieldSplit: ETCD_CONFIG_DELIMITER, + Mutate: mutateEtcdConf(dc, ETCD_CONFIG_DELIMITER, false), + ExecOptions: curveadm.ExecOptions(), + }) + + return t, nil +} diff --git a/internal/task/task/common/amend_server_config.go b/internal/task/task/common/amend_server_config.go new file mode 100644 index 000000000..a898713a3 --- /dev/null +++ b/internal/task/task/common/amend_server_config.go @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + "strings" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +const ( + ETCD_ENDPOINT = "etcd.endpoint" // fs + MDS_ETCD_ENDPOINT = "mds.etcd.endpoint" // bs + MDS_LISTEN_ADDR = "mds.listen.addr" // fs and bs +) + +func mutateServerConf(dc *topology.DeployConfig, delimiter string, forceRender bool) step.Mutate { + return func(in, key, value string) (out string, err error) { + if len(key) == 0 { + out = in + return + } + switch key { + case ETCD_ENDPOINT: + value = options[ETCD_ENDPOINT].(string) + case MDS_LISTEN_ADDR: + if dc.GetRole() != topology.ROLE_MDS { // bs mds.conf has config 'mds.listen.addr' + value = options[MDS_LISTEN_ADDR].(string) + } + } + + out = fmt.Sprintf("%s%s%s", key, delimiter, value) + return + } +} + +func NewAmendServerConfigTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Override Server configure", subname, hc.GetSSHConfig()) + + layout := dc.GetProjectLayout() + role := dc.GetRole() + var endpoints string + if role == topology.ROLE_MDS { + endpoints, err = dc.GetVariables().Get("cluster_etcd_addr") + if err != nil { + return nil, err + } + } else if role == topology.ROLE_METASERVER || + role == topology.ROLE_CHUNKSERVER || + role == topology.ROLE_SNAPSHOTCLONE { + endpoints, err = dc.GetVariables().Get("cluster_mds_addr") + if err != nil { + return nil, err + } + } + + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + fromService := migrates[0].From + toService := migrates[0].To + var fromListenOrClientPort, toListenOrClientPort string + // override etcd addr in mds.config + if role == topology.ROLE_MDS { + fromListenOrClientPort = strconv.Itoa(fromService.GetListenClientPort()) + toListenOrClientPort = strconv.Itoa(fromService.GetListenClientPort()) + // orveride mds addr in metaserver.conf (FS) OR + // override mds add in chunkserver.conf and snap_client.conf (BS) + } else if role == topology.ROLE_METASERVER || + role == topology.ROLE_CHUNKSERVER || + role == topology.ROLE_SNAPSHOTCLONE { + fromListenOrClientPort = strconv.Itoa(fromService.GetListenPort()) + toListenOrClientPort = strconv.Itoa(toService.GetListenPort()) + } + fromServiceEndpoint := fmt.Sprint(fromService.GetListenIp(), ":", fromListenOrClientPort) + toSeriveEndpint := fmt.Sprint(toService.GetListenIp(), ":", toListenOrClientPort) + epSlice := strings.Split(endpoints, ",") + removedFromService := []string{} + for _, ep := range epSlice { + if ep != fromServiceEndpoint { + removedFromService = append(removedFromService, ep) + } + } + removedFromService = append(removedFromService, toSeriveEndpint) + endpoints = strings.Join(removedFromService, ",") + if role == topology.ROLE_MDS { + if dc.GetKind() == topology.KIND_CURVEFS { + options[ETCD_ENDPOINT] = endpoints + } else { + options[MDS_ETCD_ENDPOINT] = endpoints + } + } else if role == topology.ROLE_METASERVER || + role == topology.ROLE_CHUNKSERVER || + role == topology.ROLE_SNAPSHOTCLONE { + options[MDS_LISTEN_ADDR] = endpoints + } + + configPath := layout.ServiceConfPath + if role == topology.ROLE_SNAPSHOTCLONE { + configPath = layout.ServiceConfDir + "/snap_client.conf" + } + + t.AddStep(&step.SyncFile{ // sync mds.conf config again + ContainerSrcId: &containerId, + ContainerSrcPath: configPath, + ContainerDestId: &containerId, + ContainerDestPath: configPath, + KVFieldSplit: DEFAULT_CONFIG_DELIMITER, + Mutate: mutateServerConf(dc, DEFAULT_CONFIG_DELIMITER, false), + ExecOptions: curveadm.ExecOptions(), + }) + + return t, nil +} diff --git a/internal/task/task/common/remove_etcd_mem.go b/internal/task/task/common/remove_etcd_mem.go new file mode 100644 index 000000000..6e3dc0c82 --- /dev/null +++ b/internal/task/task/common/remove_etcd_mem.go @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + + "github.com/opencurve/curveadm/cli/cli" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func checkRemoveEtcdMemberStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_REMOVE_ETCD_MEMBER.S(*out) + } + if *out == "NOTEXIST" { + return task.ERR_SKIP_TASK + } + return nil + } +} + +func NewRemoveEtcdMemberTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Remove Old Etcd Member", subname, hc.GetSSHConfig()) + + host, role := dc.GetHost(), dc.GetRole() + script := scripts.REMOVE_ETCD + layout := dc.GetProjectLayout() + scriptPath := fmt.Sprintf("%s/remove_etcd.sh", layout.ServiceBinDir) + etcdctlPath := layout.ServiceBinDir + "/etcdctl" + endpoints, err := dc.GetVariables().Get("cluster_etcd_addr") + if err != nil { + return nil, errno.ERR_GET_CLUSTER_ETCD_ADDR + } + oldName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence())) + removeEtcdCmd := fmt.Sprintf("/bin/bash %s %s %s %s", scriptPath, etcdctlPath, endpoints, oldName) + + var success bool + var out string + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &script, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Success: &success, + Out: &out, + Command: removeEtcdCmd, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkRemoveEtcdMemberStatus(&success, &out), + }) + + return t, nil +}