diff --git a/Makefile b/Makefile index 8244624db..202db37d1 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ GO := go # output OUTPUT := bin/curveadm +SERVER_OUTPUT := bin/pigeon # build flags LDFLAGS := -s -w @@ -52,12 +53,14 @@ TEST_FLAGS += -run $(CASE) # packages PACKAGES := $(PWD)/cmd/curveadm/main.go +SERVER_PACKAGES := $(PWD)/cmd/service/main.go # tar VERSION := "unknown" build: $(GOENV) $(GO) build -o $(OUTPUT) $(BUILD_FLAGS) $(PACKAGES) + $(GO) build -o $(SERVER_OUTPUT) $(SERVER_PACKAGES) debug: $(GOENV) $(GO) build -o $(OUTPUT) $(DEBUG_FLAGS) $(PACKAGES) diff --git a/cmd/service/main.go b/cmd/service/main.go new file mode 100644 index 000000000..9db6bc713 --- /dev/null +++ b/cmd/service/main.go @@ -0,0 +1,27 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package main + +import ( + "github.com/opencurve/curveadm/http" + "github.com/opencurve/pigeon" +) + +func main() { + admServer := http.NewServer() + pigeon.Serve(admServer) +} diff --git a/go.mod b/go.mod index 9d217ec1b..ad6de144b 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,11 @@ require ( github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629 github.com/kpango/glg v1.6.14 github.com/mattn/go-sqlite3 v1.14.16 + github.com/mcuadros/go-defaults v1.2.0 github.com/melbahja/goph v1.3.0 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/moby/term v0.0.0-20221205130635-1aeaba878587 + github.com/opencurve/pigeon v0.0.0-20230512031044-d5a430bb02a4 github.com/pingcap/log v1.1.0 github.com/sergi/go-diff v1.2.0 github.com/spf13/cobra v1.7.0 @@ -25,10 +27,46 @@ require ( golang.org/x/crypto v0.8.0 ) +require ( + github.com/Wine93/grace v0.0.0-20221021033009-7d0348013a3c // indirect + github.com/bytedance/sonic v1.8.7 // indirect + github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect + github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect + github.com/facebookgo/grace v0.0.0-20180706040059-75cf19382434 // indirect + github.com/facebookgo/httpdown v0.0.0-20180706035922-5979d39b15c2 // indirect + github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/gin-gonic/gin v1.9.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.12.0 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/google/pprof v0.0.0-20230406165453-00490a63f317 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/imroc/req/v3 v3.33.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect + github.com/klauspost/cpuid/v2 v2.2.4 // indirect + github.com/leodido/go-urn v1.2.3 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/onsi/ginkgo/v2 v2.9.2 // indirect + github.com/quic-go/qpack v0.4.0 // indirect + github.com/quic-go/qtls-go1-19 v0.3.2 // indirect + github.com/quic-go/qtls-go1-20 v0.2.2 // indirect + github.com/quic-go/quic-go v0.33.0 // indirect + github.com/sevlyar/go-daemon v0.1.6 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect + golang.org/x/arch v0.3.0 // indirect + golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect +) + require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect - github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/benbjohnson/clock v1.3.0 // indirect @@ -81,18 +119,17 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/theupdateframework/notary v0.7.0 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.8.0 // indirect - golang.org/x/mod v0.9.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/mod v0.10.0 // indirect golang.org/x/net v0.9.0 // indirect golang.org/x/sys v0.7.0 // indirect golang.org/x/term v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect - golang.org/x/tools v0.7.0 // indirect - google.golang.org/protobuf v1.29.1 // indirect + golang.org/x/tools v0.8.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.0.3 // indirect ) replace github.com/melbahja/goph v1.3.0 => github.com/Wine93/goph v0.0.0-20220907033045-3b286d827fb3 diff --git a/http/core/core.go b/http/core/core.go new file mode 100644 index 000000000..d10f503ea --- /dev/null +++ b/http/core/core.go @@ -0,0 +1,71 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package core + +import ( + "fmt" + "strconv" + + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/pigeon" +) + +func Exit(r *pigeon.Request, err error) bool { + var status int + if err == nil { + status = 200 + r.SendJSON(pigeon.JSON{ + "errorCode": "0", + "errorMsg": "success", + }) + } else { + code := err.(*errno.ErrorCode) + if code.IsHttpErr() { + status = code.HttpCode() + } else { + status = 503 + } + r.SendJSON(pigeon.JSON{ + "errorCode": strconv.Itoa(code.GetCode()), + "errorMsg": fmt.Sprintf("desc: %s; clue: %s", code.GetDescription(), code.GetClue()), + }) + } + return r.Exit(status) +} + +func Default(r *pigeon.Request) bool { + r.Logger().Warn("unupport request uri", pigeon.Field("uri", r.Uri)) + return Exit(r, errno.ERR_UNSUPPORT_REQUEST_URI) +} + +func ExitSuccessWithData(r *pigeon.Request, data interface{}) bool { + r.SendJSON(pigeon.JSON{ + "data": data, + "errorCode": "0", + "errorMsg": "success", + }) + return r.Exit(200) +} + +func ExitFailWithData(r *pigeon.Request, data interface{}, message string) bool { + r.SendJSON(pigeon.JSON{ + "errorCode": "503", + "errorMsg": message, + "data": data, + }) + return r.Exit(503) +} diff --git a/http/manager/bind.go b/http/manager/bind.go new file mode 100644 index 000000000..0dece3d83 --- /dev/null +++ b/http/manager/bind.go @@ -0,0 +1,81 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package manager + +import ( + "github.com/opencurve/pigeon" + "mime/multipart" + "net/http" +) + +var METHOD_REQUEST map[string]Request + +type ( + HandlerFunc func(r *pigeon.Request, ctx *Context) bool + + Context struct { + Data interface{} + } + + Request struct { + httpMethod string + method string + vType interface{} + handler HandlerFunc + } +) + +func init() { + METHOD_REQUEST = map[string]Request{} + for _, request := range requests { + METHOD_REQUEST[request.method] = request + } +} + +type DeployClusterCmdRequest struct { + Command string `json:"command" binding:"required"` +} + +type DeployClusterUploadRequest struct { + FilePath string `json:"filepath" form:"filepath" binding:"required"` + File *multipart.FileHeader `form:"file" binding:"required"` +} + +type DeployClusterDownloadRequest struct { + FilePath string `json:"filepath" form:"filepath" binding:"required"` +} + +var requests = []Request{ + { + http.MethodPost, + "cluster.deploy.cmd", + DeployClusterCmdRequest{}, + DeployClusterCmd, + }, + { + http.MethodPost, + "cluster.deploy.upload", + DeployClusterUploadRequest{}, + DeployClusterUpload, + }, + { + http.MethodGet, + "cluster.deploy.download", + DeployClusterDownloadRequest{}, + DeployClusterDownload, + }, +} diff --git a/http/manager/entrypoint.go b/http/manager/entrypoint.go new file mode 100644 index 000000000..bd81da5cb --- /dev/null +++ b/http/manager/entrypoint.go @@ -0,0 +1,50 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package manager + +import ( + "reflect" + + "github.com/mcuadros/go-defaults" + "github.com/opencurve/curveadm/http/core" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/pigeon" +) + +func Entrypoint(r *pigeon.Request) bool { + if r.Method != pigeon.HTTP_METHOD_GET && + r.Method != pigeon.HTTP_METHOD_POST { + return core.Exit(r, errno.ERR_UNSUPPORT_HTTP_METHOD) + } + + request, ok := METHOD_REQUEST[r.Args["method"]] + if !ok { + return core.Exit(r, errno.ERR_UNSUPPORT_METHOD_ARGUMENT) + } else if request.httpMethod != r.Method { + return core.Exit(r, errno.ERR_HTTP_METHOD_MISMATCHED) + } + + vType := reflect.TypeOf(request.vType) + data := reflect.New(vType).Interface() + if err := r.BindBody(data); err != nil { + r.Logger().Error("bad request form param", + pigeon.Field("error", err)) + return core.Exit(r, errno.ERR_BAD_REQUEST_FORM_PARAM) + } + defaults.SetDefaults(data) + return request.handler(r, &Context{data}) +} diff --git a/http/manager/manager.go b/http/manager/manager.go new file mode 100644 index 000000000..06e53cfe8 --- /dev/null +++ b/http/manager/manager.go @@ -0,0 +1,70 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package manager + +import ( + "github.com/opencurve/curveadm/http/core" + "github.com/opencurve/curveadm/internal/utils" + "github.com/opencurve/pigeon" + "io" + "os/exec" +) + +func DeployClusterCmd(r *pigeon.Request, ctx *Context) bool { + data := ctx.Data.(*DeployClusterCmdRequest) + r.Logger().Info("DeployClusterCmd", pigeon.Field("command", data.Command)) + cmd := exec.Command("/bin/bash", "-c", data.Command) + out, err := cmd.CombinedOutput() + if err != nil { + r.Logger().Warn("DeployClusterCmd failed when execute command", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, string(out), string(out)) + } + r.Logger().Info("DeployClusterCmd", pigeon.Field("result", out)) + return core.ExitSuccessWithData(r, string(out)) +} + +func DeployClusterUpload(r *pigeon.Request, ctx *Context) bool { + data := ctx.Data.(*DeployClusterUploadRequest) + r.Logger().Info("DeployClusterUpload", pigeon.Field("file", data.FilePath)) + mf, err := data.File.Open() + if err != nil { + r.Logger().Warn("DeployClusterUpload failed when open file", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, err.Error(), err.Error()) + } + defer mf.Close() + content, err := io.ReadAll(mf) + if err != nil { + r.Logger().Warn("DeployClusterUpload failed when read file", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, err.Error(), err.Error()) + } + err = utils.WriteFile(data.FilePath, string(content), 0644) + if err != nil { + r.Logger().Warn("DeployClusterUpload failed when write file", + pigeon.Field("error", err)) + return core.ExitFailWithData(r, err.Error(), err.Error()) + } + return core.Exit(r, err) +} + +func DeployClusterDownload(r *pigeon.Request, ctx *Context) bool { + data := ctx.Data.(*DeployClusterDownloadRequest) + r.Logger().Info("DeployClusterDownload", pigeon.Field("file", data.FilePath)) + return r.SendFile(data.FilePath) +} diff --git a/http/server.go b/http/server.go new file mode 100644 index 000000000..82fface6e --- /dev/null +++ b/http/server.go @@ -0,0 +1,30 @@ +/* +* Copyright (c) 2023 NetEase Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package http + +import ( + "github.com/opencurve/curveadm/http/core" + "github.com/opencurve/curveadm/http/manager" + "github.com/opencurve/pigeon" +) + +func NewServer() *pigeon.HTTPServer { + server := pigeon.NewHTTPServer("curveadm") + server.Route("/", manager.Entrypoint) + server.DefaultRoute(core.Default) + return server +} diff --git a/internal/configure/hosts/hc_get.go b/internal/configure/hosts/hc_get.go index c7c0c6a07..219aeff57 100644 --- a/internal/configure/hosts/hc_get.go +++ b/internal/configure/hosts/hc_get.go @@ -85,7 +85,13 @@ func (hc *HostConfig) GetUser() string { return user } +func (hc *HostConfig) GetProtocol() string { return hc.getString(CONFIG_PROTOCOL) } func (hc *HostConfig) GetSSHConfig() *module.SSHConfig { + + if hc.GetProtocol() != SSH_PROTOCOL { + return nil + } + hostname := hc.GetSSHHostname() if len(hostname) == 0 { hostname = hc.GetHostname() @@ -103,3 +109,17 @@ func (hc *HostConfig) GetSSHConfig() *module.SSHConfig { ConnectRetries: curveadm.GlobalCurveAdmConfig.GetSSHRetries(), } } + +func (hc *HostConfig) GetHttpConfig() *module.HttpConfig { + + if hc.GetProtocol() != HTTP_PROTOCOL { + return nil + } + + return &module.HttpConfig{ + Host: hc.GetHostname(), + Port: (uint)(hc.GetHTTPPort()), + } +} + +func (hc *HostConfig) GetHTTPPort() int { return hc.getInt(CONFIG_HTTP_PORT) } diff --git a/internal/configure/hosts/hc_item.go b/internal/configure/hosts/hc_item.go index 6900cbd63..a7fb50cc8 100644 --- a/internal/configure/hosts/hc_item.go +++ b/internal/configure/hosts/hc_item.go @@ -32,7 +32,10 @@ import ( ) const ( - DEFAULT_SSH_PORT = 22 + DEFAULT_SSH_PORT = 22 + DEFAULT_HTTP_PORT = 8000 + SSH_PROTOCOL = "ssh" + HTTP_PROTOCOL = "http" ) var ( @@ -97,4 +100,18 @@ var ( false, nil, ) + + CONFIG_PROTOCOL = itemset.Insert( + "protocol", + comm.REQUIRE_STRING, + false, + SSH_PROTOCOL, + ) + + CONFIG_HTTP_PORT = itemset.Insert( + "http_port", + comm.REQUIRE_POSITIVE_INTEGER, + false, + DEFAULT_HTTP_PORT, + ) ) diff --git a/internal/errno/errno.go b/internal/errno/errno.go index 46b8228c5..e96daf3c3 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -123,6 +123,14 @@ func (e *ErrorCode) Error() string { return tui.PromptErrorCode(e.code, e.description, e.clue, gLogpath) } +func (e *ErrorCode) IsHttpErr() bool { + return e.code/10000 == 70 +} + +func (e *ErrorCode) HttpCode() int { + return e.code % 1000 +} + /* * 0xx: init curveadm * @@ -447,7 +455,8 @@ var ( ERR_METASERVER_REQUIRES_3_HOSTS = EC(503009, "metaserver requires at least 3 hosts to distrubute zones") // 510: checker (ssh) - ERR_SSH_CONNECT_FAILED = EC(510000, "SSH connect failed") + ERR_SSH_CONNECT_FAILED = EC(510000, "SSH connect failed") + ERR_HTTP_CONNECT_FAILED = EC(510001, "HTTP connect failed") // 520: checker (permission) ERR_USER_NOT_FOUND = EC(520000, "user not found") @@ -549,6 +558,13 @@ var ( // 690: execuetr task (others) ERR_START_CRONTAB_IN_CONTAINER_FAILED = EC(690000, "start crontab in container failed") + // 70: http service + ERR_UNSUPPORT_REQUEST_URI = EC(701400, "unsupport request uri") + ERR_UNSUPPORT_METHOD_ARGUMENT = EC(702400, "unsupport method argument") + ERR_HTTP_METHOD_MISMATCHED = EC(703400, "http method mismatch") + ERR_BAD_REQUEST_FORM_PARAM = EC(704400, "bad request form param") + ERR_UNSUPPORT_HTTP_METHOD = EC(705405, "unsupport http method") + // 900: others ERR_CANCEL_OPERATION = EC(CODE_CANCEL_OPERATION, "cancel operation") // 999 diff --git a/internal/task/context/context.go b/internal/task/context/context.go index bbc758373..693e170f6 100644 --- a/internal/task/context/context.go +++ b/internal/task/context/context.go @@ -29,27 +29,27 @@ import ( ) type Context struct { - sshClient *module.SSHClient - module *module.Module - register *Register + remoteClient module.RemoteClient + module *module.Module + register *Register } -func NewContext(sshClient *module.SSHClient) (*Context, error) { +func NewContext(remoteClient module.RemoteClient) (*Context, error) { return &Context{ - sshClient: sshClient, - module: module.NewModule(sshClient), - register: NewRegister(), + remoteClient: remoteClient, + module: module.NewModule(remoteClient), + register: NewRegister(), }, nil } func (ctx *Context) Close() { - if ctx.sshClient != nil { - ctx.sshClient.Client().Close() + if ctx.remoteClient != nil { + ctx.remoteClient.Close() } } -func (ctx *Context) SSHClient() *module.SSHClient { - return ctx.sshClient +func (ctx *Context) RemoteClient() module.RemoteClient { + return ctx.remoteClient } func (ctx *Context) Module() *module.Module { diff --git a/internal/task/step/shell.go b/internal/task/step/shell.go index 2545b0f7a..88a08668d 100644 --- a/internal/task/step/shell.go +++ b/internal/task/step/shell.go @@ -607,18 +607,20 @@ func (s *Scp) Execute(ctx *context.Context) error { return errno.ERR_WRITE_FILE_FAILED.E(err) } - config := ctx.SSHClient().Config() - cmd := ctx.Module().Shell().Scp(localPath, config.User, config.Host, s.RemotePath) - cmd.AddOption("-P %d", config.Port) - if !config.ForwardAgent { - cmd.AddOption("-i %s", config.PrivateKeyPath) - } + //config := ctx.SSHClient().Config() + //cmd := ctx.Module().Shell().Scp(localPath, config.User, config.Host, s.RemotePath) + //cmd.AddOption("-P %d", config.Port) + //if !config.ForwardAgent { + // cmd.AddOption("-i %s", config.PrivateKeyPath) + //} + + err = ctx.Module().File().Upload(localPath, s.RemotePath) options := s.ExecOptions options.ExecWithSudo = false options.ExecInLocal = true - out, err := cmd.Execute(options) - return PostHandle(nil, nil, out, err, errno.ERR_SECURE_COPY_FILE_TO_REMOTE_FAILED) + //out, err := cmd.Execute(options) + return PostHandle(nil, nil, "", err, errno.ERR_SECURE_COPY_FILE_TO_REMOTE_FAILED) } func (s *Command) Execute(ctx *context.Context) error { diff --git a/internal/task/task/bs/add_target.go b/internal/task/task/bs/add_target.go index 19f73c5d5..83a721dd3 100644 --- a/internal/task/task/bs/add_target.go +++ b/internal/task/task/bs/add_target.go @@ -52,7 +52,7 @@ func NewAddTargetTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task } subname := fmt.Sprintf("host=%s volume=%s", options.Host, volume) - t := task.NewTask("Add Target", subname, hc.GetSSHConfig()) + t := task.NewTask("Add Target", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/bs/balance_leader.go b/internal/task/task/bs/balance_leader.go index 2c29f75a7..565688233 100644 --- a/internal/task/task/bs/balance_leader.go +++ b/internal/task/task/bs/balance_leader.go @@ -48,7 +48,7 @@ func NewBalanceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Ta subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Balance Leader", subname, hc.GetSSHConfig()) + t := task.NewTask("Balance Leader", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step t.AddStep(&step.ContainerExec{ diff --git a/internal/task/task/bs/create_volume.go b/internal/task/task/bs/create_volume.go index 0142424f2..44e34e43b 100644 --- a/internal/task/task/bs/create_volume.go +++ b/internal/task/task/bs/create_volume.go @@ -106,7 +106,7 @@ func NewCreateVolumeTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*t } subname := fmt.Sprintf("hostname=%s image=%s", hc.GetHostname(), cc.GetContainerImage()) - t := task.NewTask("Create Volume", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Volume", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var out string diff --git a/internal/task/task/bs/delete_target.go b/internal/task/task/bs/delete_target.go index 562f4e821..9b86a1736 100644 --- a/internal/task/task/bs/delete_target.go +++ b/internal/task/task/bs/delete_target.go @@ -58,7 +58,7 @@ func NewDeleteTargetTask(curveadm *cli.CurveAdm, cc *client.ClientConfig) (*task } subname := fmt.Sprintf("hostname=%s tid=%s", hc.GetHostname(), options.Tid) - t := task.NewTask("Delete Target", subname, hc.GetSSHConfig()) + t := task.NewTask("Delete Target", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/bs/detect_release.go b/internal/task/task/bs/detect_release.go index b1cb0a23a..e61ce8da3 100644 --- a/internal/task/task/bs/detect_release.go +++ b/internal/task/task/bs/detect_release.go @@ -75,7 +75,7 @@ func NewDetectOSReleaseTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, var success bool var out string subname := fmt.Sprintf("host=%s", host) - t := task.NewTask("Detect OS Release", subname, hc.GetSSHConfig()) + t := task.NewTask("Detect OS Release", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.Cat{ diff --git a/internal/task/task/bs/format.go b/internal/task/task/bs/format.go index 1ea0f9de9..cf7a2b71b 100644 --- a/internal/task/task/bs/format.go +++ b/internal/task/task/bs/format.go @@ -182,7 +182,7 @@ func NewFormatChunkfilePoolTask(curveadm *cli.CurveAdm, fc *configure.FormatConf chunkSize := fc.GetChunkSize() subname := fmt.Sprintf("host=%s device=%s mountPoint=%s usage=%d%%", fc.GetHost(), device, mountPoint, usagePercent) - t := task.NewTask("Start Format Chunkfile Pool", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Format Chunkfile Pool", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var output, containerId, oldUUID string diff --git a/internal/task/task/bs/format_status.go b/internal/task/task/bs/format_status.go index 121dd80b9..396f5d6e3 100644 --- a/internal/task/task/bs/format_status.go +++ b/internal/task/task/bs/format_status.go @@ -26,10 +26,10 @@ import ( "fmt" "strings" - comm "github.com/opencurve/curveadm/internal/common" - "github.com/opencurve/curveadm/internal/errno" "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/errno" "github.com/opencurve/curveadm/internal/task/context" "github.com/opencurve/curveadm/internal/task/step" "github.com/opencurve/curveadm/internal/task/task" @@ -122,7 +122,7 @@ func NewGetFormatStatusTask(curveadm *cli.CurveAdm, fc *configure.FormatConfig) // new task device := fc.GetDevice() subname := fmt.Sprintf("host=%s device=%s", fc.GetHost(), fc.GetDevice()) - t := task.NewTask("Get Format Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Format Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var deviceUsage, containerStatus string diff --git a/internal/task/task/bs/format_stop.go b/internal/task/task/bs/format_stop.go index 6332e3f05..25fdffc0d 100644 --- a/internal/task/task/bs/format_stop.go +++ b/internal/task/task/bs/format_stop.go @@ -79,7 +79,7 @@ func NewStopFormatTask(curveadm *cli.CurveAdm, fc *configure.FormatConfig) (*tas containerName := device2ContainerName(device) subname := fmt.Sprintf("host=%s device=%s mountPoint=%s containerName=%s", fc.GetHost(), device, mountPoint, containerName) - t := task.NewTask("Stop Format Chunkfile Pool", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Format Chunkfile Pool", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var oldContainerId string var oldUUID string diff --git a/internal/task/task/bs/install_polarfs.go b/internal/task/task/bs/install_polarfs.go index 9b7976d36..a43cbe0b9 100644 --- a/internal/task/task/bs/install_polarfs.go +++ b/internal/task/task/bs/install_polarfs.go @@ -104,7 +104,7 @@ func NewInstallPolarFSTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) ( // new task release := getRelease(curveadm) subname := fmt.Sprintf("host=%s release=%s", host, release) - t := task.NewTask("Install PolarFS", subname, hc.GetSSHConfig()) + t := task.NewTask("Install PolarFS", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var input, output string diff --git a/internal/task/task/bs/list_targets.go b/internal/task/task/bs/list_targets.go index cab076dbb..7ce2412dc 100644 --- a/internal/task/task/bs/list_targets.go +++ b/internal/task/task/bs/list_targets.go @@ -119,7 +119,7 @@ func NewListTargetsTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, erro } subname := fmt.Sprintf("host=%s", hc.GetHostname()) - t := task.NewTask("List Targets", subname, hc.GetSSHConfig()) + t := task.NewTask("List Targets", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/bs/map.go b/internal/task/task/bs/map.go index b22a31ebf..68eb0f6af 100644 --- a/internal/task/task/bs/map.go +++ b/internal/task/task/bs/map.go @@ -79,7 +79,7 @@ func NewMapTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task.Task, } subname := fmt.Sprintf("hostname=%s volume=%s:%s", hc.GetHostname(), options.User, options.Volume) - t := task.NewTask("Map Volume", subname, hc.GetSSHConfig()) + t := task.NewTask("Map Volume", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var out string diff --git a/internal/task/task/bs/start_nebd.go b/internal/task/task/bs/start_nebd.go index a6ab93de5..e2a2eef5c 100644 --- a/internal/task/task/bs/start_nebd.go +++ b/internal/task/task/bs/start_nebd.go @@ -154,7 +154,7 @@ func NewStartNEBDServiceTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) } subname := fmt.Sprintf("hostname=%s image=%s", hc.GetHostname(), cc.GetContainerImage()) - t := task.NewTask("Start NEBD Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Start NEBD Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var containerId, out string diff --git a/internal/task/task/bs/start_tgtd.go b/internal/task/task/bs/start_tgtd.go index fbd9ca462..cca7005dd 100644 --- a/internal/task/task/bs/start_tgtd.go +++ b/internal/task/task/bs/start_tgtd.go @@ -66,7 +66,7 @@ func NewStartTargetDaemonTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig // new task subname := fmt.Sprintf("host=%s image=%s", options.Host, cc.GetContainerImage()) - t := task.NewTask("Start Target Daemon", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Target Daemon", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status, containerId, out string diff --git a/internal/task/task/bs/stop_tgtd.go b/internal/task/task/bs/stop_tgtd.go index 0725c0355..c65130cf9 100644 --- a/internal/task/task/bs/stop_tgtd.go +++ b/internal/task/task/bs/stop_tgtd.go @@ -50,7 +50,7 @@ func NewStopTargetDaemonTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, // new task subname := fmt.Sprintf("host=%s", options.Host) - t := task.NewTask("Stop Target Daemon", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Target Daemon", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var containerId string diff --git a/internal/task/task/bs/uninstall_polarfs.go b/internal/task/task/bs/uninstall_polarfs.go index c228403ca..c89d98547 100644 --- a/internal/task/task/bs/uninstall_polarfs.go +++ b/internal/task/task/bs/uninstall_polarfs.go @@ -77,7 +77,7 @@ func NewUninstallPolarFSTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, // new task release := getRelease(curveadm) subname := fmt.Sprintf("host=%s release=%s", host, release) - t := task.NewTask("Uninstall PolarFS", subname, hc.GetSSHConfig()) + t := task.NewTask("Uninstall PolarFS", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.RemoveFile{ diff --git a/internal/task/task/bs/unmap.go b/internal/task/task/bs/unmap.go index 3e1dee29b..b04fea22e 100644 --- a/internal/task/task/bs/unmap.go +++ b/internal/task/task/bs/unmap.go @@ -144,7 +144,7 @@ func NewUnmapTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) { subname := fmt.Sprintf("hostname=%s volume=%s:%s containerId=%s", hc.GetHostname(), options.User, options.Volume, tui.TrimContainerId(containerId)) - t := task.NewTask("Unmap Volume", subname, hc.GetSSHConfig()) + t := task.NewTask("Unmap Volume", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var output string diff --git a/internal/task/task/checker/date.go b/internal/task/task/checker/date.go index 41a4900aa..95777db2a 100644 --- a/internal/task/task/checker/date.go +++ b/internal/task/task/checker/date.go @@ -87,7 +87,7 @@ func NewGetHostDate(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Ta } subname := fmt.Sprintf("host=%s start=%d", dc.GetHost(), time.Now().Unix()) - t := task.NewTask("Get Host Date ", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Host Date ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var start int64 var out string @@ -132,7 +132,7 @@ func checkDate(curveadm *cli.CurveAdm) step.LambdaType { } func NewCheckDate(curveadm *cli.CurveAdm, c interface{}) (*task.Task, error) { - t := task.NewTask("Check Host Date ", "", nil) + t := task.NewTask("Check Host Date ", "", nil, nil) t.AddStep(&step.Lambda{ Lambda: checkDate(curveadm), }) diff --git a/internal/task/task/checker/kernel.go b/internal/task/task/checker/kernel.go index 3db2ed1d9..58522f1fc 100644 --- a/internal/task/task/checker/kernel.go +++ b/internal/task/task/checker/kernel.go @@ -106,7 +106,7 @@ func NewCheckKernelVersionTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig // new task subname := fmt.Sprintf("host=%s role=%s require=(>=%s)", dc.GetHost(), dc.GetRole(), CHUNKSERVER_LEAST_KERNEL_VERSION) - t := task.NewTask("Check Kernel Version ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Kernel Version ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string @@ -132,7 +132,7 @@ func NewCheckKernelModuleTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig // new task name := curveadm.MemStorage().Get(comm.KEY_CHECK_KERNEL_MODULE_NAME).(string) subname := fmt.Sprintf("host=%s module=%s", host, name) - t := task.NewTask("Check Kernel Module", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Kernel Module", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/checker/network.go b/internal/task/task/checker/network.go index ece0e0595..4c96e1d1a 100644 --- a/internal/task/task/checker/network.go +++ b/internal/task/task/checker/network.go @@ -134,7 +134,7 @@ func NewCheckPortInUseTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* addresses := getServiceListenAddresses(dc) subname := fmt.Sprintf("host=%s role=%s ports={%s}", dc.GetHost(), dc.GetRole(), joinPorts(dc, addresses)) - t := task.NewTask("Check Port In Use ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Port In Use ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var containerId, out string var success bool @@ -203,7 +203,7 @@ func NewCheckDestinationReachableTask(curveadm *cli.CurveAdm, dc *topology.Deplo addresses := unique(getServiceConnectAddress(dc, dcs)) subname := fmt.Sprintf("host=%s role=%s ping={%s}", dc.GetHost(), dc.GetRole(), tui.TrimAddress(strings.Join(addresses, ","))) - t := task.NewTask("Check Destination Reachable ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Destination Reachable ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) var out string var success bool @@ -258,7 +258,7 @@ func NewStartHTTPServerTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( addresses := getServiceListenAddresses(dc) subname := fmt.Sprintf("host=%s role=%s ports={%s}", dc.GetHost(), dc.GetRole(), joinPorts(dc, addresses)) - t := task.NewTask("Start Mock HTTP Server ", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Mock HTTP Server ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var containerId, out string @@ -329,7 +329,7 @@ func NewCheckNetworkFirewallTask(curveadm *cli.CurveAdm, dc *topology.DeployConf // add task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Network Firewall ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Network Firewall ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string @@ -395,7 +395,7 @@ func NewCleanEnvironmentTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) // new task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Clean Precheck Environment", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Precheck Environment", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/checker/permission.go b/internal/task/task/checker/permission.go index 42504ce0b..a4a116875 100644 --- a/internal/task/task/checker/permission.go +++ b/internal/task/task/checker/permission.go @@ -119,7 +119,7 @@ func NewCheckPermissionTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Permission ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Permission ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out, hostname string @@ -145,7 +145,7 @@ func NewCheckPermissionTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( t.AddStep(&step.Ping{ Destination: &hostname, Count: 1, - Timeout: 1, + Timeout: 1, Success: &success, ExecOptions: curveadm.ExecOptions(), }) diff --git a/internal/task/task/checker/service.go b/internal/task/task/checker/service.go index f762f1942..cb19f443a 100644 --- a/internal/task/task/checker/service.go +++ b/internal/task/task/checker/service.go @@ -138,7 +138,7 @@ func NewCheckChunkfilePoolTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig } subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check Chunkfile Pool ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check Chunkfile Pool ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) t.AddStep(&step2CheckChunkfilePool{ dc: dc, @@ -150,7 +150,7 @@ func NewCheckChunkfilePoolTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig func NewCheckS3Task(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Check S3", subname, nil) + t := task.NewTask("Check S3", subname, nil, nil) t.AddStep(&step2CheckS3{ s3AccessKey: dc.GetS3AccessKey(), @@ -171,13 +171,13 @@ func NewCheckMdsAddressTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) address := cc.GetClusterMDSAddr() subname := fmt.Sprintf("host=%s address=%s", host, address) - t := task.NewTask("Check MDS Address", subname, hc.GetSSHConfig()) + t := task.NewTask("Check MDS Address", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) return t, nil } func NewClientS3ConfigureTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task.Task, error) { - t := task.NewTask("Check S3 Configure ", "", nil) + t := task.NewTask("Check S3 Configure ", "", nil, nil) t.AddStep(&step2CheckClientS3Configure{ config: cc, diff --git a/internal/task/task/checker/ssh.go b/internal/task/task/checker/ssh.go index 9d0e893bd..c3ea3b2d9 100644 --- a/internal/task/task/checker/ssh.go +++ b/internal/task/task/checker/ssh.go @@ -73,7 +73,7 @@ func NewCheckSSHConnectTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( // new task method := utils.Choose(hc.GetForwardAgent(), "forwardAgent", "privateKey") subname := fmt.Sprintf("host=%s method=%s", dc.GetHost(), method) - t := task.NewTask("Check SSH Connect ", subname, hc.GetSSHConfig()) + t := task.NewTask("Check SSH Connect ", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.Lambda{ diff --git a/internal/task/task/checker/topology.go b/internal/task/task/checker/topology.go index 9b6c5185e..d28b030ca 100644 --- a/internal/task/task/checker/topology.go +++ b/internal/task/task/checker/topology.go @@ -263,7 +263,7 @@ func NewCheckTopologyTask(curveadm *cli.CurveAdm, null interface{}) (*task.Task, // new task dcs := curveadm.MemStorage().Get(comm.KEY_ALL_DEPLOY_CONFIGS).([]*topology.DeployConfig) subname := fmt.Sprintf("cluster=%s kind=%s", curveadm.ClusterName(), dcs[0].GetKind()) - t := task.NewTask("Check Topology ", subname, nil) + t := task.NewTask("Check Topology ", subname, nil, nil) // add step to task for _, dc := range dcs { diff --git a/internal/task/task/common/backup_etcd.go b/internal/task/task/common/backup_etcd.go index e9aaeb3b7..ba218df44 100644 --- a/internal/task/task/common/backup_etcd.go +++ b/internal/task/task/common/backup_etcd.go @@ -57,7 +57,7 @@ func NewBackupEtcdDataTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig()) + t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) t.AddStep(&step.ContainerExec{ ContainerId: &containerId, diff --git a/internal/task/task/common/clean_service.go b/internal/task/task/common/clean_service.go index a3e94c87a..83f27e598 100644 --- a/internal/task/task/common/clean_service.go +++ b/internal/task/task/common/clean_service.go @@ -149,7 +149,7 @@ func NewCleanServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*ta recycle := curveadm.MemStorage().Get(comm.KEY_CLEAN_BY_RECYCLE).(bool) subname := fmt.Sprintf("host=%s role=%s containerId=%s clean=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId), strings.Join(only, ",")) - t := task.NewTask("Clean Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task clean := utils.Slice2Map(only) diff --git a/internal/task/task/common/client_status.go b/internal/task/task/common/client_status.go index 87628814a..4ed1d2ce7 100644 --- a/internal/task/task/common/client_status.go +++ b/internal/task/task/common/client_status.go @@ -139,7 +139,7 @@ func (s *step2FormatClientStatus) Execute(ctx *context.Context) error { func NewInitClientStatusTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) { client := v.(storage.Client) - t := task.NewTask("Init Client Status", "", nil) + t := task.NewTask("Init Client Status", "", nil, nil) var cfgPath string t.AddStep(&step.Lambda{ @@ -164,7 +164,7 @@ func NewGetClientStatusTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, containerId := client.ContainerId subname := fmt.Sprintf("host=%s kind=%s containerId=%s", hc.GetHost(), client.Kind, tui.TrimContainerId(containerId)) - t := task.NewTask("Get Client Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Client Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step var status string diff --git a/internal/task/task/common/collect_client.go b/internal/task/task/common/collect_client.go index f9085cf20..1d8363d77 100644 --- a/internal/task/task/common/collect_client.go +++ b/internal/task/task/common/collect_client.go @@ -47,7 +47,7 @@ func NewCollectClientTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, er containerId := client.ContainerId subname := fmt.Sprintf("host=%s kind=%s containerId=%s", hc.GetHost(), client.Kind, tui.TrimContainerId(containerId)) - t := task.NewTask("Collect Client", subname, hc.GetSSHConfig()) + t := task.NewTask("Collect Client", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/collect_curveadm.go b/internal/task/task/common/collect_curveadm.go index e093ac9df..36a3b88d2 100644 --- a/internal/task/task/common/collect_curveadm.go +++ b/internal/task/task/common/collect_curveadm.go @@ -40,7 +40,7 @@ func NewCollectCurveAdmTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( kind := dc.GetKind() subname := fmt.Sprintf("cluster=%s kind=%s", curveadm.ClusterName(), kind) - t := task.NewTask("Collect CurveAdm", subname, nil) + t := task.NewTask("Collect CurveAdm", subname, nil, nil) // add step to task dbPath := curveadm.Config().GetDBPath() diff --git a/internal/task/task/common/collect_service.go b/internal/task/task/common/collect_service.go index 9718d38a6..f5581dc1f 100644 --- a/internal/task/task/common/collect_service.go +++ b/internal/task/task/common/collect_service.go @@ -93,7 +93,7 @@ func NewCollectServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Collect Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Collect Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/create_container.go b/internal/task/task/common/create_container.go index 1a6d6676c..aacbf1089 100644 --- a/internal/task/task/common/create_container.go +++ b/internal/task/task/common/create_container.go @@ -214,7 +214,7 @@ func NewCreateContainerTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s", dc.GetHost(), dc.GetRole()) - t := task.NewTask("Create Container", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Container", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var oldContainerId, containerId string diff --git a/internal/task/task/common/create_pool.go b/internal/task/task/common/create_pool.go index 3adbbc200..aac7f8fc5 100644 --- a/internal/task/task/common/create_pool.go +++ b/internal/task/task/common/create_pool.go @@ -200,7 +200,7 @@ func NewCreateTopologyTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* "Create Logical Pool", "Create Physical Pool") subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask(name, subname, hc.GetSSHConfig()) + t := task.NewTask(name, subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var success bool diff --git a/internal/task/task/common/etcd_auth_enable.go b/internal/task/task/common/etcd_auth_enable.go index 9d3b71109..79280feac 100644 --- a/internal/task/task/common/etcd_auth_enable.go +++ b/internal/task/task/common/etcd_auth_enable.go @@ -63,7 +63,7 @@ func NewEnableEtcdAuthTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Enable Etcd Auth", subname, hc.GetSSHConfig()) + t := task.NewTask("Enable Etcd Auth", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) script := scripts.ENABLE_ETCD_AUTH layout := dc.GetProjectLayout() diff --git a/internal/task/task/common/init_support.go b/internal/task/task/common/init_support.go index c322cc4a2..baa4d5f44 100644 --- a/internal/task/task/common/init_support.go +++ b/internal/task/task/common/init_support.go @@ -37,7 +37,7 @@ func NewInitSupportTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*tas kind := dc.GetKind() subname := fmt.Sprintf("cluster=%s kind=%s", curveadm.ClusterName(), kind) - t := task.NewTask("Init Support", subname, nil) + t := task.NewTask("Init Support", subname, nil, nil) /* * 0d7a7103521da69c6331a96355142c3b diff --git a/internal/task/task/common/install_client.go b/internal/task/task/common/install_client.go index dcb691b12..baadda58d 100644 --- a/internal/task/task/common/install_client.go +++ b/internal/task/task/common/install_client.go @@ -142,7 +142,7 @@ func NewInstallClientTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (* release := getRelease(curveadm) subname := fmt.Sprintf("host=%s release=%s", host, release) name := utils.Choose(kind == KIND_CURVEBS, "CurveBS", "CurveFS") - t := task.NewTask(fmt.Sprintf("Install %s Client", name), subname, hc.GetSSHConfig()) + t := task.NewTask(fmt.Sprintf("Install %s Client", name), subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var input, output string diff --git a/internal/task/task/common/pull_image.go b/internal/task/task/common/pull_image.go index ee40f930d..855b815c9 100644 --- a/internal/task/task/common/pull_image.go +++ b/internal/task/task/common/pull_image.go @@ -41,7 +41,7 @@ func NewPullImageTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task. // new task subname := fmt.Sprintf("host=%s image=%s", dc.GetHost(), dc.GetContainerImage()) - t := task.NewTask("Pull Image", subname, hc.GetSSHConfig()) + t := task.NewTask("Pull Image", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.PullImage{ diff --git a/internal/task/task/common/restart_service.go b/internal/task/task/common/restart_service.go index 25915a7e3..2bec6e6e2 100644 --- a/internal/task/task/common/restart_service.go +++ b/internal/task/task/common/restart_service.go @@ -71,7 +71,7 @@ func NewRestartServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Restart Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Restart Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/service_status.go b/internal/task/task/common/service_status.go index 660fd68c7..3be7dc94f 100644 --- a/internal/task/task/common/service_status.go +++ b/internal/task/task/common/service_status.go @@ -229,7 +229,7 @@ func NewInitServiceStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Init Service Status", subname, nil) + t := task.NewTask("Init Service Status", subname, nil, nil) t.AddStep(&step2InitStatus{ dc: dc, @@ -265,7 +265,7 @@ func NewGetServiceStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Get Service Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Service Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/common/start_service.go b/internal/task/task/common/start_service.go index 7f592f23b..f421d3572 100644 --- a/internal/task/task/common/start_service.go +++ b/internal/task/task/common/start_service.go @@ -89,7 +89,7 @@ func NewStartServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*ta // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Start Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/stop_service.go b/internal/task/task/common/stop_service.go index 9600d9f99..8aee65529 100644 --- a/internal/task/task/common/stop_service.go +++ b/internal/task/task/common/stop_service.go @@ -73,7 +73,7 @@ func NewStopServiceTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*tas // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Stop Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/sync_config.go b/internal/task/task/common/sync_config.go index b247a56a5..8373e7c7b 100644 --- a/internal/task/task/common/sync_config.go +++ b/internal/task/task/common/sync_config.go @@ -103,7 +103,7 @@ func NewSyncConfigTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Sync Config", subname, hc.GetSSHConfig()) + t := task.NewTask("Sync Config", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/common/uninstall_client.go b/internal/task/task/common/uninstall_client.go index ce939832d..f02f7ed0d 100644 --- a/internal/task/task/common/uninstall_client.go +++ b/internal/task/task/common/uninstall_client.go @@ -81,7 +81,7 @@ func NewUninstallClientTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, kind := curveadm.MemStorage().Get(comm.KEY_CLIENT_KIND).(string) subname := fmt.Sprintf("host=%s release=%s kind=%s", host, release, kind) name := utils.Choose(kind == KIND_CURVEBS, "CurveBS", "CurveFS") - t := task.NewTask(fmt.Sprintf("Uninstall %s Client", name), subname, hc.GetSSHConfig()) + t := task.NewTask(fmt.Sprintf("Uninstall %s Client", name), subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddPostStep(&step2UninstallPackage{ diff --git a/internal/task/task/common/update_topology.go b/internal/task/task/common/update_topology.go index b89bb8a43..f693a047f 100644 --- a/internal/task/task/common/update_topology.go +++ b/internal/task/task/common/update_topology.go @@ -27,10 +27,10 @@ package common import ( "github.com/opencurve/curveadm/cli/cli" comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/errno" "github.com/opencurve/curveadm/internal/task/context" "github.com/opencurve/curveadm/internal/task/step" "github.com/opencurve/curveadm/internal/task/task" - "github.com/opencurve/curveadm/internal/errno" ) func updateTopology(curveadm *cli.CurveAdm) step.LambdaType { @@ -45,7 +45,7 @@ func updateTopology(curveadm *cli.CurveAdm) step.LambdaType { } func NewUpdateTopologyTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) { - t := task.NewTask("Update Topology", "", nil) + t := task.NewTask("Update Topology", "", nil, nil) // add step to task t.AddStep(&step.Lambda{ diff --git a/internal/task/task/fs/mount.go b/internal/task/task/fs/mount.go index 124d3f8ef..29be1c665 100644 --- a/internal/task/task/fs/mount.go +++ b/internal/task/task/fs/mount.go @@ -289,7 +289,7 @@ func NewMountFSTask(curveadm *cli.CurveAdm, cc *configure.ClientConfig) (*task.T mountFSName := options.MountFSName mountFSType := options.MountFSType subname := fmt.Sprintf("mountFSName=%s mountFSType=%s mountPoint=%s", mountFSName, mountFSType, mountPoint) - t := task.NewTask("Mount FileSystem", subname, hc.GetSSHConfig()) + t := task.NewTask("Mount FileSystem", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var containerId, out string diff --git a/internal/task/task/fs/umount.go b/internal/task/task/fs/umount.go index 9a1275cae..aaecd6407 100644 --- a/internal/task/task/fs/umount.go +++ b/internal/task/task/fs/umount.go @@ -131,7 +131,7 @@ func NewUmountFSTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, error) // new task mountPoint := options.MountPoint subname := fmt.Sprintf("host=%s mountPoint=%s", options.Host, mountPoint) - t := task.NewTask("Umount FileSystem", subname, hc.GetSSHConfig()) + t := task.NewTask("Umount FileSystem", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/monitor/clean_container.go b/internal/task/task/monitor/clean_container.go index 1756be472..feee92a10 100644 --- a/internal/task/task/monitor/clean_container.go +++ b/internal/task/task/monitor/clean_container.go @@ -44,7 +44,7 @@ func NewCleanConfigContainerTask(curveadm *cli.CurveAdm, cfg *configure.MonitorC if err != nil { return nil, err } - t := task.NewTask("Clean Config Container", "", hc.GetSSHConfig()) + t := task.NewTask("Clean Config Container", "", hc.GetSSHConfig(), hc.GetHttpConfig()) t.AddStep(&common.Step2CleanContainer{ ServiceId: serviceId, ContainerId: containerId, diff --git a/internal/task/task/monitor/clean_service.go b/internal/task/task/monitor/clean_service.go index 26e928dc4..316ee6f81 100644 --- a/internal/task/task/monitor/clean_service.go +++ b/internal/task/task/monitor/clean_service.go @@ -73,7 +73,7 @@ func NewCleanMonitorTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) ( only := curveadm.MemStorage().Get(comm.KEY_CLEAN_ITEMS).([]string) subname := fmt.Sprintf("host=%s role=%s containerId=%s clean=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId), strings.Join(only, ",")) - t := task.NewTask("Clean Monitor", subname, hc.GetSSHConfig()) + t := task.NewTask("Clean Monitor", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task clean := utils.Slice2Map(only) diff --git a/internal/task/task/monitor/create_container.go b/internal/task/task/monitor/create_container.go index a01f3e514..ccf6382d8 100644 --- a/internal/task/task/monitor/create_container.go +++ b/internal/task/task/monitor/create_container.go @@ -120,7 +120,7 @@ func NewCreateContainerTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig // new task subname := fmt.Sprintf("host=%s role=%s", host, cfg.GetRole()) - t := task.NewTask("Create Container", subname, hc.GetSSHConfig()) + t := task.NewTask("Create Container", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var oldContainerId, containerId string diff --git a/internal/task/task/monitor/pull_image.go b/internal/task/task/monitor/pull_image.go index 80883e43d..ebe87ae86 100644 --- a/internal/task/task/monitor/pull_image.go +++ b/internal/task/task/monitor/pull_image.go @@ -41,7 +41,7 @@ func NewPullImageTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) (*ta // new task subname := fmt.Sprintf("host=%s image=%s", host, image) - t := task.NewTask("Pull Image", subname, hc.GetSSHConfig()) + t := task.NewTask("Pull Image", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task t.AddStep(&step.PullImage{ Image: image, diff --git a/internal/task/task/monitor/restart_service.go b/internal/task/task/monitor/restart_service.go index 25be8d924..726056ba2 100644 --- a/internal/task/task/monitor/restart_service.go +++ b/internal/task/task/monitor/restart_service.go @@ -49,7 +49,7 @@ func NewRestartServiceTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Restart Monitor Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Restart Monitor Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/monitor/start_service.go b/internal/task/task/monitor/start_service.go index 7391e1aaf..71c55faa8 100644 --- a/internal/task/task/monitor/start_service.go +++ b/internal/task/task/monitor/start_service.go @@ -59,7 +59,7 @@ func NewStartServiceTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) ( // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Start Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Start Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/monitor/status_service.go b/internal/task/task/monitor/status_service.go index 9c9c36323..6908a8710 100644 --- a/internal/task/task/monitor/status_service.go +++ b/internal/task/task/monitor/status_service.go @@ -126,7 +126,7 @@ func NewInitMonitorStatusTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConf subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Init Monitor Status", subname, nil) + t := task.NewTask("Init Monitor Status", subname, nil, nil) t.AddStep(&step2InitMonitorStatus{ mc: cfg, @@ -154,7 +154,7 @@ func NewGetMonitorStatusTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfi // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Get Monitor Status", subname, hc.GetSSHConfig()) + t := task.NewTask("Get Monitor Status", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var status string diff --git a/internal/task/task/monitor/stop_service.go b/internal/task/task/monitor/stop_service.go index 126a5801d..221cb3e52 100644 --- a/internal/task/task/monitor/stop_service.go +++ b/internal/task/task/monitor/stop_service.go @@ -49,7 +49,7 @@ func NewStopServiceTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) (* // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Stop Service", subname, hc.GetSSHConfig()) + t := task.NewTask("Stop Service", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string diff --git a/internal/task/task/monitor/sync_config.go b/internal/task/task/monitor/sync_config.go index 6393d2797..5ed9824cb 100644 --- a/internal/task/task/monitor/sync_config.go +++ b/internal/task/task/monitor/sync_config.go @@ -72,7 +72,7 @@ func NewSyncConfigTask(curveadm *cli.CurveAdm, cfg *configure.MonitorConfig) (*t // new task subname := fmt.Sprintf("host=%s role=%s containerId=%s", cfg.GetHost(), cfg.GetRole(), tui.TrimContainerId(containerId)) - t := task.NewTask("Sync Config", subname, hc.GetSSHConfig()) + t := task.NewTask("Sync Config", subname, hc.GetSSHConfig(), hc.GetHttpConfig()) // add step to task var out string t.AddStep(&step.ListContainers{ // gurantee container exist diff --git a/internal/task/task/playground/create.go b/internal/task/task/playground/create.go index 68b0823b6..05e534d57 100644 --- a/internal/task/task/playground/create.go +++ b/internal/task/task/playground/create.go @@ -100,7 +100,7 @@ func NewCreatePlaygroundTask(curveadm *cli.CurveAdm, cfg *configure.PlaygroundCo // new task subname := fmt.Sprintf("kind=%s name=%s image=%s", kind, name, containerImage) - t := task.NewTask("Create Playground", subname, nil) + t := task.NewTask("Create Playground", subname, nil, nil) var containerId string // add step to task diff --git a/internal/task/task/playground/init.go b/internal/task/task/playground/init.go index a2bcfc0f3..9360fd532 100644 --- a/internal/task/task/playground/init.go +++ b/internal/task/task/playground/init.go @@ -106,7 +106,7 @@ func NewInitPlaygroundTask(curveadm *cli.CurveAdm, cfg *configure.PlaygroundConf kind := cfg.GetKind() name := cfg.GetName() subname := fmt.Sprintf("kind=%s name=%s", kind, name) - t := task.NewTask("Init Playground", subname, nil) + t := task.NewTask("Init Playground", subname, nil, nil) // add step to task var containerId string diff --git a/internal/task/task/playground/list.go b/internal/task/task/playground/list.go index 99dd534c1..1ce567985 100644 --- a/internal/task/task/playground/list.go +++ b/internal/task/task/playground/list.go @@ -83,7 +83,7 @@ func NewGetPlaygroundStatusTask(curveadm *cli.CurveAdm, v interface{}) (*task.Ta // new task playground := v.(storage.Playground) subname := fmt.Sprintf("id=%d name=%s", playground.Id, playground.Name) - t := task.NewTask("Get Playground Status", subname, nil) + t := task.NewTask("Get Playground Status", subname, nil, nil) // add step to task var status string diff --git a/internal/task/task/playground/remove.go b/internal/task/task/playground/remove.go index df6bdf647..4d92a8424 100644 --- a/internal/task/task/playground/remove.go +++ b/internal/task/task/playground/remove.go @@ -95,7 +95,7 @@ func NewRemovePlaygroundTask(curveadm *cli.CurveAdm, v interface{}) (*task.Task, // new task playground := v.(storage.Playground) subname := fmt.Sprintf("name=%s", playground.Name) - t := task.NewTask("Remove Playground", subname, nil) + t := task.NewTask("Remove Playground", subname, nil, nil) // add step to task var containerId string diff --git a/internal/task/task/playground/start.go b/internal/task/task/playground/start.go index 535fc4915..bbbc2ba83 100644 --- a/internal/task/task/playground/start.go +++ b/internal/task/task/playground/start.go @@ -43,7 +43,7 @@ func wait(seconds int) step.LambdaType { func NewStartPlaygroundTask(curveadm *cli.CurveAdm, cfg *configure.PlaygroundConfig) (*task.Task, error) { // new task subname := fmt.Sprintf("kind=%s name=%s", cfg.GetKind(), cfg.GetName()) - t := task.NewTask("Start Playground", subname, nil) + t := task.NewTask("Start Playground", subname, nil, nil) // add step to task containerId := cfg.GetName() diff --git a/internal/task/task/task.go b/internal/task/task/task.go index 47a6fa6f2..be836c006 100644 --- a/internal/task/task/task.go +++ b/internal/task/task/task.go @@ -44,25 +44,27 @@ type ( } Task struct { - tid string // task id - ptid string // parent task id - name string - subname string - steps []Step - postSteps []Step - sshConfig *module.SSHConfig - context context.Context + tid string // task id + ptid string // parent task id + name string + subname string + steps []Step + postSteps []Step + sshConfig *module.SSHConfig + httpConfig *module.HttpConfig + context context.Context } ) -func NewTask(name, subname string, sshConfig *module.SSHConfig) *Task { +func NewTask(name, subname string, sshConfig *module.SSHConfig, httpConfig *module.HttpConfig) *Task { tid := uuid.NewString()[:12] return &Task{ - tid: tid, - ptid: tid, - name: name, - subname: subname, - sshConfig: sshConfig, + tid: tid, + ptid: tid, + name: name, + subname: subname, + sshConfig: sshConfig, + httpConfig: httpConfig, } } @@ -112,16 +114,22 @@ func (t *Task) executePost(ctx *context.Context) { } func (t *Task) Execute() error { - var sshClient *module.SSHClient + var remoteClient module.RemoteClient if t.sshConfig != nil { client, err := module.NewSSHClient(*t.sshConfig) if err != nil { return errno.ERR_SSH_CONNECT_FAILED.E(err) } - sshClient = client + remoteClient = client + } else if t.httpConfig != nil { + client, err := module.NewHttpClient(*t.httpConfig) + if err != nil { + return errno.ERR_HTTP_CONNECT_FAILED.E(err) + } + remoteClient = client } - ctx, err := context.NewContext(sshClient) + ctx, err := context.NewContext(remoteClient) if err != nil { return err } diff --git a/internal/tui/hosts.go b/internal/tui/hosts.go index 9a19ba983..67366c89c 100644 --- a/internal/tui/hosts.go +++ b/internal/tui/hosts.go @@ -45,6 +45,8 @@ func FormatHosts(hcs []*configure.HostConfig, verbose bool) string { "Hostname", "User", "Port", + "Protocol", + "HTTP Port", "Private Key File", "Forward Agent", "Become User", @@ -60,8 +62,10 @@ func FormatHosts(hcs []*configure.HostConfig, verbose bool) string { host := hc.GetHost() hostname := hc.GetHostname() + protocol := hc.GetProtocol() user := hc.GetUser() port := strconv.Itoa(hc.GetSSHPort()) + httpPort := strconv.Itoa(hc.GetHTTPPort()) forwardAgent := utils.Choose(hc.GetForwardAgent(), "Y", "N") becomeUser := utils.Choose(len(hc.GetBecomeUser()) > 0, hc.GetBecomeUser(), "-") labels := utils.Choose(len(hc.GetLabels()) > 0, strings.Join(hc.GetLabels(), ","), "-") @@ -78,6 +82,8 @@ func FormatHosts(hcs []*configure.HostConfig, verbose bool) string { hostname, user, port, + protocol, + httpPort, privateKeyFile, forwardAgent, becomeUser, diff --git a/pkg/module/docker_cli.go b/pkg/module/docker_cli.go index 43974faf8..926fc5fef 100644 --- a/pkg/module/docker_cli.go +++ b/pkg/module/docker_cli.go @@ -49,18 +49,18 @@ const ( ) type DockerCli struct { - sshClient *SSHClient - options []string - tmpl *template.Template - data map[string]interface{} + options []string + tmpl *template.Template + data map[string]interface{} + remoteClient RemoteClient } -func NewDockerCli(sshClient *SSHClient) *DockerCli { +func NewDockerCli(remoteClient RemoteClient) *DockerCli { return &DockerCli{ - sshClient: sshClient, - options: []string{}, - tmpl: nil, - data: map[string]interface{}{}, + remoteClient: remoteClient, + options: []string{}, + tmpl: nil, + data: map[string]interface{}{}, } } @@ -72,7 +72,7 @@ func (s *DockerCli) AddOption(format string, args ...interface{}) *DockerCli { func (cli *DockerCli) Execute(options ExecOptions) (string, error) { cli.data["options"] = strings.Join(cli.options, " ") cli.data["engine"] = options.ExecWithEngine - return execCommand(cli.sshClient, cli.tmpl, cli.data, options) + return execCommand(cli.remoteClient, cli.tmpl, cli.data, options) } func (cli *DockerCli) DockerInfo() *DockerCli { diff --git a/pkg/module/file.go b/pkg/module/file.go index b7d686420..4a8f49062 100644 --- a/pkg/module/file.go +++ b/pkg/module/file.go @@ -41,35 +41,36 @@ var ( ) type FileManager struct { - sshClient *SSHClient + remoteClient RemoteClient } -func NewFileManager(sshClient *SSHClient) *FileManager { - return &FileManager{sshClient: sshClient} +func NewFileManager(remoteClient RemoteClient) *FileManager { + return &FileManager{remoteClient: remoteClient} } func (f *FileManager) Upload(localPath, remotePath string) error { - if f.sshClient == nil { + if f.remoteClient == nil { return ERR_UNREACHED } - err := f.sshClient.Client().Upload(localPath, remotePath) + err := f.remoteClient.Upload(localPath, remotePath) log.SwitchLevel(err)("UploadFile", - log.Field("remoteAddress", remoteAddr(f.sshClient)), + log.Field("remoteAddress", remoteAddr(f.remoteClient)), log.Field("localPath", localPath), log.Field("remotePath", remotePath), - log.Field("error", err)) + log.Field("error", err), + log.Field("protocol", f.remoteClient.Protocol())) return err } func (f *FileManager) Download(remotePath, localPath string) error { - if f.sshClient == nil { + if f.remoteClient == nil { return ERR_UNREACHED } - err := f.sshClient.Client().Download(remotePath, localPath) + err := f.remoteClient.Download(remotePath, localPath) log.SwitchLevel(err)("DownloadFile", - log.Field("remoteAddress", remoteAddr(f.sshClient)), + log.Field("remoteAddress", remoteAddr(f.remoteClient)), log.Field("remotePath", remotePath), log.Field("localPath", localPath), log.Field("error", err)) diff --git a/pkg/module/http.go b/pkg/module/http.go new file mode 100644 index 000000000..51213b7b8 --- /dev/null +++ b/pkg/module/http.go @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package module + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + log "github.com/opencurve/curveadm/pkg/log/glg" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" +) + +const ( + HTTP_PROTOCOL = "http" +) + +type ( + HttpConfig struct { + Host string + Port uint + } + + HttpClient struct { + config HttpConfig + client *http.Client + } + + HttpResult struct { + Data string `json:"data"` + ErrorCode string `json:"errorCode"` + ErrorMsg string `json:"errorMsg"` + } +) + +func (client *HttpClient) Protocol() string { + return HTTP_PROTOCOL +} + +func (client *HttpClient) WrapperCommand(command string, execInLocal bool) (wrapperCmd string) { + return command +} + +func (client *HttpClient) RunCommand(ctx context.Context, command string) (out []byte, err error) { + data := make(map[string]interface{}) + data["command"] = command + bytesData, _ := json.Marshal(data) + + baseURL, _ := url.Parse(fmt.Sprintf("http://%s:%d", client.config.Host, client.config.Port)) + params := url.Values{} + params.Add("method", "cluster.deploy.cmd") + baseURL.RawQuery = params.Encode() + resp, err := client.client.Post(baseURL.String(), "application/json", bytes.NewReader(bytesData)) + if err != nil { + return + } + respData, err := io.ReadAll(resp.Body) + if err != nil { + return + } + result := &HttpResult{} + err = json.Unmarshal(respData, result) + if err != nil { + return + } + + log.Info("http resp", log.Field("result", result)) + + if result.ErrorCode != "0" { + return []byte(result.Data), fmt.Errorf(result.ErrorMsg) + } + + return []byte(result.Data), nil +} + +func (client *HttpClient) RemoteAddr() (addr string) { + config := client.Config() + return fmt.Sprintf("%s:%d", config.Host, config.Port) +} + +func (client *HttpClient) Upload(localPath string, remotePath string) (err error) { + bodyBuf := &bytes.Buffer{} + bodyWriter := multipart.NewWriter(bodyBuf) + fh, err := os.Open(localPath) + if err != nil { + return err + } + defer fh.Close() + fileWriter, err := bodyWriter.CreateFormFile("file", localPath) + if err != nil { + return err + } + _, err = io.Copy(fileWriter, fh) + if err != nil { + return err + } + + baseURL, _ := url.Parse(fmt.Sprintf("http://%s:%d", client.config.Host, client.config.Port)) + params := url.Values{} + params.Add("method", "cluster.deploy.upload") + baseURL.RawQuery = params.Encode() + boundary := "--boundary" + bodyWriter.SetBoundary(boundary) + bodyWriter.WriteField("filepath", remotePath) + bodyWriter.Close() + resp, err := client.client.Post(baseURL.String(), bodyWriter.FormDataContentType(), bodyBuf) + if err != nil { + return err + } + defer resp.Body.Close() + return err +} + +func (client *HttpClient) Download(remotePath string, localPath string) (err error) { + baseURL, _ := url.Parse(fmt.Sprintf("http://%s:%d", client.config.Host, client.config.Port)) + params := url.Values{} + params.Add("method", "cluster.deploy.download") + params.Add("filepath", remotePath) + baseURL.RawQuery = params.Encode() + resp, err := client.client.Get(baseURL.String()) + if err != nil { + return + } + defer resp.Body.Close() + localFile, err := os.Create(localPath) + if err != nil { + return + } + defer localFile.Close() + _, err = io.Copy(localFile, resp.Body) + return +} + +func (client *HttpClient) Close() { + +} + +func (client *HttpClient) Config() HttpConfig { + return client.config +} + +func NewHttpClient(config HttpConfig) (*HttpClient, error) { + return &HttpClient{ + config: config, + client: &http.Client{}, + }, nil +} diff --git a/pkg/module/module.go b/pkg/module/module.go index 5c0571f82..5ff2bbf8b 100644 --- a/pkg/module/module.go +++ b/pkg/module/module.go @@ -33,13 +33,12 @@ import ( "text/template" "time" - "github.com/melbahja/goph" log "github.com/opencurve/curveadm/pkg/log/glg" ) type ( Module struct { - sshClient *SSHClient + remoteClient RemoteClient } ExecOptions struct { @@ -60,33 +59,31 @@ func (e *TimeoutError) Error() string { e.timeout) } -func NewModule(sshClient *SSHClient) *Module { - return &Module{sshClient: sshClient} +func NewModule(remoteClient RemoteClient) *Module { + return &Module{remoteClient: remoteClient} } func (m *Module) Shell() *Shell { - return NewShell(m.sshClient) + return NewShell(m.remoteClient) } func (m *Module) File() *FileManager { - return NewFileManager(m.sshClient) + return NewFileManager(m.remoteClient) } func (m *Module) DockerCli() *DockerCli { - return NewDockerCli(m.sshClient) + return NewDockerCli(m.remoteClient) } // common utils -func remoteAddr(client *SSHClient) string { +func remoteAddr(client RemoteClient) string { if client == nil { return "-" } - - config := client.Config() - return fmt.Sprintf("%s@%s:%d", config.User, config.Host, config.Port) + return client.RemoteAddr() } -func execCommand(sshClient *SSHClient, +func execCommand(remoteClient RemoteClient, tmpl *template.Template, data map[string]interface{}, options ExecOptions) (string, error) { @@ -108,14 +105,8 @@ func execCommand(sshClient *SSHClient, command = strings.TrimLeft(command, " ") // (3) handle 'become_user' - if sshClient != nil { - becomeMethod := sshClient.Config().BecomeMethod - becomeFlags := sshClient.Config().BecomeFlags - becomeUser := sshClient.Config().BecomeUser - if len(becomeUser) > 0 && !options.ExecInLocal { - become := strings.Join([]string{becomeMethod, becomeFlags, becomeUser}, " ") - command = strings.Join([]string{become, command}, " ") - } + if remoteClient != nil { + command = remoteClient.WrapperCommand(command, options.ExecInLocal) } // (4) create context for timeout @@ -134,11 +125,7 @@ func execCommand(sshClient *SSHClient, cmd.Env = []string{"LANG=en_US.UTF-8"} out, err = cmd.CombinedOutput() } else { - var cmd *goph.Cmd - cmd, err = sshClient.Client().CommandContext(ctx, command) - if err == nil { - out, err = cmd.CombinedOutput() - } + out, err = remoteClient.RunCommand(ctx, command) } if ctx.Err() == context.DeadlineExceeded { @@ -146,7 +133,7 @@ func execCommand(sshClient *SSHClient, } log.SwitchLevel(err)("Execute command", - log.Field("remoteAddr", remoteAddr(sshClient)), + log.Field("remoteAddr", remoteAddr(remoteClient)), log.Field("command", command), log.Field("output", strings.TrimSuffix(string(out), "\n")), log.Field("error", err)) diff --git a/pkg/module/remote_client.go b/pkg/module/remote_client.go new file mode 100644 index 000000000..93dcdcac3 --- /dev/null +++ b/pkg/module/remote_client.go @@ -0,0 +1,15 @@ +package module + +import ( + "context" +) + +type RemoteClient interface { + Protocol() string + WrapperCommand(command string, execInLocal bool) (wrapperCmd string) + RunCommand(ctx context.Context, command string) (out []byte, err error) + RemoteAddr() (addr string) + Upload(localPath string, remotePath string) (err error) + Download(remotePath string, localPath string) (err error) + Close() +} diff --git a/pkg/module/shell.go b/pkg/module/shell.go index 9be3379dd..9c7114cc2 100644 --- a/pkg/module/shell.go +++ b/pkg/module/shell.go @@ -79,18 +79,18 @@ const ( // TODO(P1): support command pipe type Shell struct { - sshClient *SSHClient - options []string - tmpl *template.Template - data map[string]interface{} + remoteClient RemoteClient + options []string + tmpl *template.Template + data map[string]interface{} } -func NewShell(sshClient *SSHClient) *Shell { +func NewShell(remoteClient RemoteClient) *Shell { return &Shell{ - sshClient: sshClient, - options: []string{}, - tmpl: nil, - data: map[string]interface{}{}, + remoteClient: remoteClient, + options: []string{}, + tmpl: nil, + data: map[string]interface{}{}, } } @@ -111,7 +111,7 @@ func (s *Shell) String() (string, error) { func (s *Shell) Execute(options ExecOptions) (string, error) { s.data["options"] = strings.Join(s.options, " ") - return execCommand(s.sshClient, s.tmpl, s.data, options) + return execCommand(s.remoteClient, s.tmpl, s.data, options) } // text diff --git a/pkg/module/ssh.go b/pkg/module/ssh.go index 4e8abf889..990aab263 100644 --- a/pkg/module/ssh.go +++ b/pkg/module/ssh.go @@ -25,8 +25,11 @@ package module import ( + "context" "errors" + "fmt" "net" + "strings" "time" "github.com/melbahja/goph" @@ -34,6 +37,10 @@ import ( "golang.org/x/crypto/ssh" ) +const ( + SSH_PROTOCOL = "ssh" +) + type ( SSHConfig struct { User string @@ -151,3 +158,45 @@ connect: config: config, }, err } + +func (client *SSHClient) WrapperCommand(command string, execInLocal bool) string { + becomeMethod := client.Config().BecomeMethod + becomeFlags := client.Config().BecomeFlags + becomeUser := client.Config().BecomeUser + if len(becomeUser) > 0 && !execInLocal { + become := strings.Join([]string{becomeMethod, becomeFlags, becomeUser}, " ") + command = strings.Join([]string{become, command}, " ") + } + return command +} + +func (client *SSHClient) RunCommand(ctx context.Context, command string) (out []byte, err error) { + var cmd *goph.Cmd + cmd, err = client.Client().CommandContext(ctx, command) + if err == nil { + cmd.Env = []string{"LANG=en_US.UTF-8"} + out, err = cmd.CombinedOutput() + } + return +} + +func (client *SSHClient) RemoteAddr() (addr string) { + config := client.Config() + return fmt.Sprintf("%s@%s:%d", config.User, config.Host, config.Port) +} + +func (client *SSHClient) Upload(localPath string, remotePath string) (err error) { + return client.client.Upload(localPath, remotePath) +} + +func (client *SSHClient) Download(remotePath string, localPath string) (err error) { + return client.client.Download(remotePath, localPath) +} + +func (client *SSHClient) Close() { + client.client.Close() +} + +func (client *SSHClient) Protocol() string { + return SSH_PROTOCOL +}