diff --git a/components/cluster/command/transfer.go b/components/cluster/command/transfer.go index b6c027ef95..210853f49f 100644 --- a/components/cluster/command/transfer.go +++ b/components/cluster/command/transfer.go @@ -49,6 +49,7 @@ func newPullCmd() *cobra.Command { cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only exec on host with specified roles") cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only exec on host with specified nodes") cmd.Flags().IntVarP(&opt.Limit, "limit", "l", 0, "Limits the used bandwidth, specified in Kbit/s") + cmd.Flags().BoolVar(&opt.Compress, "compress", false, "Compression enable. Passes the -C flag to ssh(1) to enable compression.") return cmd } diff --git a/components/dm/ansible/import.go b/components/dm/ansible/import.go index f6b83dffc4..5613347486 100644 --- a/components/dm/ansible/import.go +++ b/components/dm/ansible/import.go @@ -180,7 +180,7 @@ func (im *Importer) fetchFile(ctx context.Context, host string, port int, fname tmp = filepath.Join(tmp, filepath.Base(fname)) - err = e.Transfer(ctx, fname, tmp, true /*download*/, 0) + err = e.Transfer(ctx, fname, tmp, true /*download*/, 0, false) if err != nil { return nil, errors.Annotatef(err, "transfer %s from %s:%d", fname, host, port) } @@ -254,7 +254,7 @@ func (im *Importer) ScpSourceToMaster(ctx context.Context, topo *spec.Specificat return errors.AddStack(err) } - err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false, 0) + err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false, 0, false) if err != nil { return err } diff --git a/components/dm/ansible/import_test.go b/components/dm/ansible/import_test.go index f49dea70ab..f3223ad26f 100644 --- a/components/dm/ansible/import_test.go +++ b/components/dm/ansible/import_test.go @@ -48,13 +48,13 @@ func (g *executorGetter) Get(host string) ctxt.Executor { // Transfer implements executor interface. // Replace the deploy directory as the local one in testdata, so we can fetch it. -func (l *localExecutor) Transfer(ctx context.Context, src, target string, download bool, limit int) error { +func (l *localExecutor) Transfer(ctx context.Context, src, target string, download bool, limit int, _ bool) error { mydeploy, err := filepath.Abs("./testdata/deploy_dir/" + l.host) if err != nil { return errors.AddStack(err) } src = strings.Replace(src, "/home/tidb/deploy", mydeploy, 1) - return l.Local.Transfer(ctx, src, target, download, 0) + return l.Local.Transfer(ctx, src, target, download, 0, false) } func TestParseRunScript(t *testing.T) { diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index f54b71da70..f3d7d50c12 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -135,7 +135,7 @@ func (i *MasterInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { @@ -177,7 +177,7 @@ func (i *MasterInstance) ScaleConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { @@ -266,7 +266,7 @@ func (i *WorkerInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_dm-worker.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } diff --git a/pkg/cluster/ansible/config.go b/pkg/cluster/ansible/config.go index f1e7d1e074..cfb3679eec 100644 --- a/pkg/cluster/ansible/config.go +++ b/pkg/cluster/ansible/config.go @@ -80,7 +80,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, gOpt operator.Options) inst.GetPort())), inst.GetHost(), true, - 0). + 0, + false). Build() copyFileTasks = append(copyFileTasks, t) case spec.ComponentTiFlash: @@ -113,7 +114,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, gOpt operator.Options) inst.GetPort())), inst.GetHost(), true, - 0). + 0, + false). CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+"-learner.toml"), spec.ClusterPath(name, spec.AnsibleImportedConfigPath, @@ -123,7 +125,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, gOpt operator.Options) inst.GetPort())), inst.GetHost(), true, - 0). + 0, + false). Build() copyFileTasks = append(copyFileTasks, t) default: diff --git a/pkg/cluster/ctxt/context.go b/pkg/cluster/ctxt/context.go index a53ea532cc..219a877dd9 100644 --- a/pkg/cluster/ctxt/context.go +++ b/pkg/cluster/ctxt/context.go @@ -46,7 +46,7 @@ type ( Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error) // Transfer copies files from or to a target - Transfer(ctx context.Context, src, dst string, download bool, limit int) error + Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error } // ExecutorGetter get the executor by host. diff --git a/pkg/cluster/executor/checkpoint.go b/pkg/cluster/executor/checkpoint.go index 4fa5d828a5..c29ac930ab 100644 --- a/pkg/cluster/executor/checkpoint.go +++ b/pkg/cluster/executor/checkpoint.go @@ -86,7 +86,7 @@ func (c *CheckPointExecutor) Execute(ctx context.Context, cmd string, sudo bool, } // Transfer implements Executer interface. -func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) (err error) { +func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) (err error) { point := checkpoint.Acquire(ctx, scpPoint, map[string]interface{}{ "host": c.config.Host, "port": c.config.Port, @@ -95,6 +95,7 @@ func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, down "dst": dst, "download": download, "limit": limit, + "compress": compress, }) defer func() { point.Release(err, @@ -109,5 +110,5 @@ func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, down return nil } - return c.Executor.Transfer(ctx, src, dst, download, limit) + return c.Executor.Transfer(ctx, src, dst, download, limit, compress) } diff --git a/pkg/cluster/executor/local.go b/pkg/cluster/executor/local.go index a3916267ec..9471e9552c 100644 --- a/pkg/cluster/executor/local.go +++ b/pkg/cluster/executor/local.go @@ -100,7 +100,7 @@ func (l *Local) Execute(ctx context.Context, cmd string, sudo bool, timeout ...t } // Transfer implements Executer interface. -func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int) error { +func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int, _ bool) error { targetPath := filepath.Dir(dst) if err := utils.CreateDir(targetPath); err != nil { return err diff --git a/pkg/cluster/executor/local_test.go b/pkg/cluster/executor/local_test.go index 321a1cd66f..870b5e6e12 100644 --- a/pkg/cluster/executor/local_test.go +++ b/pkg/cluster/executor/local_test.go @@ -54,7 +54,7 @@ func TestLocal(t *testing.T) { defer os.Remove(dst.Name()) // Transfer src to dst and check it. - err = local.Transfer(ctx, src.Name(), dst.Name(), false, 0) + err = local.Transfer(ctx, src.Name(), dst.Name(), false, 0, false) assert.Nil(err) data, err := os.ReadFile(dst.Name()) diff --git a/pkg/cluster/executor/scp.go b/pkg/cluster/executor/scp.go index 84e1f209ac..2e1cc39d87 100644 --- a/pkg/cluster/executor/scp.go +++ b/pkg/cluster/executor/scp.go @@ -29,7 +29,7 @@ import ( // ScpDownload downloads a file from remote with SCP // The implementation is partially inspired by github.com/dtylman/scp -func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limit int) error { +func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limit int, compress bool) error { r, err := session.StdoutPipe() if err != nil { return err @@ -97,10 +97,15 @@ func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limi copyErrC <- copyF() }() - remoteCmd := fmt.Sprintf("scp -f %s", src) + remoteArgs := make([]string, 0) + if compress { + remoteArgs = append(remoteArgs, "-C") + } if limit > 0 { - remoteCmd = fmt.Sprintf("scp -l %d -f %s", limit, src) + remoteArgs = append(remoteArgs, fmt.Sprintf("-l %d", limit)) } + remoteCmd := fmt.Sprintf("scp %s -f %s", strings.Join(remoteArgs, " "), src) + err = session.Start(remoteCmd) if err != nil { return err diff --git a/pkg/cluster/executor/ssh.go b/pkg/cluster/executor/ssh.go index 94aeb3af7b..2e6b8f88e3 100644 --- a/pkg/cluster/executor/ssh.go +++ b/pkg/cluster/executor/ssh.go @@ -200,7 +200,7 @@ func (e *EasySSHExecutor) Execute(ctx context.Context, cmd string, sudo bool, ti // This function depends on `scp` (a tool from OpenSSH or other SSH implementation) // This function is based on easyssh.MakeConfig.Scp() but with support of copying // file from remote to local. -func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error { +func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error { if !download { err := e.Config.Scp(src, dst) if err != nil { @@ -217,7 +217,7 @@ func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, downloa defer client.Close() defer session.Close() - return ScpDownload(session, client, src, dst, limit) + return ScpDownload(session, client, src, dst, limit, compress) } func (e *NativeSSHExecutor) prompt(def string) string { @@ -354,7 +354,7 @@ func (e *NativeSSHExecutor) Execute(ctx context.Context, cmd string, sudo bool, // Transfer copies files via SCP // This function depends on `scp` (a tool from OpenSSH or other SSH implementation) -func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error { +func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error { if e.ConnectionTestResult != nil { return e.ConnectionTestResult } @@ -372,6 +372,9 @@ func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, downl if limit > 0 { args = append(args, "-l", fmt.Sprint(limit)) } + if compress { + args = append(args, "-C") + } args = e.configArgs(args, true) // prefix and postfix args if download { diff --git a/pkg/cluster/manager/transfer.go b/pkg/cluster/manager/transfer.go index 24304c8e2a..f28aafde78 100644 --- a/pkg/cluster/manager/transfer.go +++ b/pkg/cluster/manager/transfer.go @@ -35,10 +35,11 @@ import ( // TransferOptions for exec shell commanm. type TransferOptions struct { - Local string - Remote string - Pull bool // default to push - Limit int // rate limit in Kbit/s + Local string + Remote string + Pull bool // default to push + Limit int // rate limit in Kbit/s + Compress bool // enable compress } // Transfer copies files from or to host in the tidb cluster. @@ -94,9 +95,9 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio for _, p := range i.Slice() { t := task.NewBuilder(gOpt.DisplayMode) if opt.Pull { - t.CopyFile(p, srcPath, host, opt.Pull, opt.Limit) + t.CopyFile(p, srcPath, host, opt.Pull, opt.Limit, opt.Compress) } else { - t.CopyFile(srcPath, p, host, opt.Pull, opt.Limit) + t.CopyFile(srcPath, p, host, opt.Pull, opt.Limit, opt.Compress) } shellTasks = append(shellTasks, t.Build()) } diff --git a/pkg/cluster/operation/telemetry.go b/pkg/cluster/operation/telemetry.go index df4c451ad5..ba2551db58 100644 --- a/pkg/cluster/operation/telemetry.go +++ b/pkg/cluster/operation/telemetry.go @@ -87,7 +87,7 @@ func GetNodeInfo( dstDir := filepath.Join(dir, "bin") dstPath := filepath.Join(dstDir, path.Base(srcPath)) - err = exec.Transfer(nctx, srcPath, dstPath, false, 0) + err = exec.Transfer(nctx, srcPath, dstPath, false, 0, false) if err != nil { return err } diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index d2e8d63f21..a0298f211f 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -154,7 +154,7 @@ func (i *AlertManagerInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_alertmanager.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 24577176b8..1f4dfbd764 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -188,7 +188,7 @@ func (i *CDCInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "scripts", "run_cdc.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index 2798476fb9..5de4049a54 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -177,7 +177,7 @@ func (i *DrainerInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "scripts", "run_drainer.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index 57efad8587..112f286b6c 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -151,7 +151,7 @@ func (i *GrafanaInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_grafana.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -176,7 +176,7 @@ func (i *GrafanaInstance) InitConfig( return err } dst = filepath.Join(paths.Deploy, "conf", "grafana.ini") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -277,7 +277,7 @@ func (i *GrafanaInstance) installDashboards(ctx context.Context, e ctxt.Executor srcPath := PackagePath(ComponentDMMaster, clusterVersion, i.OS(), i.Arch()) dstPath := filepath.Join(tmp, filepath.Base(srcPath)) - err = e.Transfer(ctx, srcPath, dstPath, false, 0) + err = e.Transfer(ctx, srcPath, dstPath, false, 0, false) if err != nil { return err } diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 80e740c94e..d1dadfb07c 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -182,7 +182,7 @@ func (i *BaseInstance) InitConfig(ctx context.Context, e ctxt.Executor, opt Glob return errors.Trace(err) } tgt := filepath.Join("/tmp", comp+"_"+uuid.New().String()+".service") - if err := e.Transfer(ctx, sysCfg, tgt, false, 0); err != nil { + if err := e.Transfer(ctx, sysCfg, tgt, false, 0, false); err != nil { return errors.Annotatef(err, "transfer from %s to %s failed", sysCfg, tgt) } cmd := fmt.Sprintf("mv %s /etc/systemd/system/%s-%d.service", tgt, comp, port) @@ -203,7 +203,7 @@ func (i *BaseInstance) TransferLocalConfigFile(ctx context.Context, e ctxt.Execu return errors.Annotatef(err, "execute: %s", cmd) } - if err := e.Transfer(ctx, local, remote, false, 0); err != nil { + if err := e.Transfer(ctx, local, remote, false, 0, false); err != nil { return errors.Annotatef(err, "transfer from %s to %s failed", local, remote) } @@ -255,7 +255,7 @@ func (i *BaseInstance) MergeServerConfig(ctx context.Context, e ctxt.Executor, g } dst := filepath.Join(paths.Deploy, "conf", fmt.Sprintf("%s.toml", i.ComponentName())) // transfer config - return e.Transfer(ctx, fp, dst, false, 0) + return e.Transfer(ctx, fp, dst, false, 0, false) } // mergeTiFlashLearnerServerConfig merges the server configuration and overwrite the global configuration @@ -271,7 +271,7 @@ func (i *BaseInstance) mergeTiFlashLearnerServerConfig(ctx context.Context, e ct } dst := filepath.Join(paths.Deploy, "conf", fmt.Sprintf("%s-learner.toml", i.ComponentName())) // transfer config - return e.Transfer(ctx, fp, dst, false, 0) + return e.Transfer(ctx, fp, dst, false, 0, false) } // ID returns the identifier of this instance, the ID is constructed by host:port diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index f9b222136c..38d77430f4 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -177,7 +177,7 @@ func (i *MonitorInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_prometheus.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -328,7 +328,7 @@ func (i *MonitorInstance) InitConfig( return err } dst = filepath.Join(paths.Deploy, "conf", "ngmonitoring.toml") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -337,7 +337,7 @@ func (i *MonitorInstance) InitConfig( return err } dst = filepath.Join(paths.Deploy, "conf", "prometheus.yml") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -363,7 +363,7 @@ func (i *MonitorInstance) installRules(ctx context.Context, e ctxt.Executor, dep srcPath := PackagePath(ComponentDMMaster, clusterVersion, i.OS(), i.Arch()) dstPath := filepath.Join(tmp, filepath.Base(srcPath)) - err = e.Transfer(ctx, srcPath, dstPath, false, 0) + err = e.Transfer(ctx, srcPath, dstPath, false, 0, false) if err != nil { return err } diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index d5e14bb367..0440a9742a 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -191,7 +191,7 @@ func (i *PDInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "scripts", "run_pd.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { @@ -292,7 +292,7 @@ func (i *PDInstance) ScaleConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_pd.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index 1ba15ec634..93b95fe1b1 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -172,7 +172,7 @@ func (i *PumpInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "scripts", "run_pump.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index 0c073432c7..4e81e6ecc1 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -151,7 +151,7 @@ func (i *TiDBInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_tidb.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil { diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index 24e6d420ec..3aa90869a0 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -603,7 +603,7 @@ func (i *TiFlashInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_tiflash.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index 108a1f2bb5..de4b829953 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -226,7 +226,7 @@ func (i *TiKVInstance) InitConfig( } dst := filepath.Join(paths.Deploy, "scripts", "run_tikv.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index 3a41b08ca8..09bcdf0f24 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -223,7 +223,7 @@ func (i *TiSparkMasterInstance) InitConfig( return errors.Trace(err) } tgt := filepath.Join("/tmp", comp+"_"+uuid.New().String()+".service") - if err := e.Transfer(ctx, sysCfg, tgt, false, 0); err != nil { + if err := e.Transfer(ctx, sysCfg, tgt, false, 0, false); err != nil { return errors.Annotatef(err, "transfer from %s to %s failed", sysCfg, tgt) } cmd := fmt.Sprintf("mv %s /etc/systemd/system/%s-%d.service", tgt, comp, port) @@ -249,7 +249,7 @@ func (i *TiSparkMasterInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "conf", "spark-defaults.conf") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -265,7 +265,7 @@ func (i *TiSparkMasterInstance) InitConfig( } // tispark files are all in a "spark" sub-directory of deploy dir dst = filepath.Join(paths.Deploy, "conf", "spark-env.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -279,7 +279,7 @@ func (i *TiSparkMasterInstance) InitConfig( return err } dst = filepath.Join(paths.Deploy, "conf", "log4j.properties") - return e.Transfer(ctx, fp, dst, false, 0) + return e.Transfer(ctx, fp, dst, false, 0, false) } // ScaleConfig deploy temporary config on scaling @@ -386,7 +386,7 @@ func (i *TiSparkWorkerInstance) InitConfig( return errors.Trace(err) } tgt := filepath.Join("/tmp", comp+"_"+uuid.New().String()+".service") - if err := e.Transfer(ctx, sysCfg, tgt, false, 0); err != nil { + if err := e.Transfer(ctx, sysCfg, tgt, false, 0, false); err != nil { return errors.Annotatef(err, "transfer from %s to %s failed", sysCfg, tgt) } cmd := fmt.Sprintf("mv %s /etc/systemd/system/%s-%d.service", tgt, comp, port) @@ -412,7 +412,7 @@ func (i *TiSparkWorkerInstance) InitConfig( return err } dst := filepath.Join(paths.Deploy, "conf", "spark-defaults.conf") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -429,7 +429,7 @@ func (i *TiSparkWorkerInstance) InitConfig( } // tispark files are all in a "spark" sub-directory of deploy dir dst = filepath.Join(paths.Deploy, "conf", "spark-env.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -443,7 +443,7 @@ func (i *TiSparkWorkerInstance) InitConfig( return err } dst = filepath.Join(paths.Deploy, "sbin", "start-slave.sh") - if err := e.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } @@ -457,7 +457,7 @@ func (i *TiSparkWorkerInstance) InitConfig( return err } dst = filepath.Join(paths.Deploy, "conf", "log4j.properties") - return e.Transfer(ctx, fp, dst, false, 0) + return e.Transfer(ctx, fp, dst, false, 0, false) } // ScaleConfig deploy temporary config on scaling diff --git a/pkg/cluster/task/builder.go b/pkg/cluster/task/builder.go index e6ce3c610f..db7cae12f9 100644 --- a/pkg/cluster/task/builder.go +++ b/pkg/cluster/task/builder.go @@ -172,13 +172,14 @@ func (b *Builder) UpdateTopology(cluster, profile string, metadata *spec.Cluster } // CopyFile appends a CopyFile task to the current task collection -func (b *Builder) CopyFile(src, dst, server string, download bool, limit int) *Builder { +func (b *Builder) CopyFile(src, dst, server string, download bool, limit int, compress bool) *Builder { b.tasks = append(b.tasks, &CopyFile{ src: src, dst: dst, remote: server, download: download, limit: limit, + compress: compress, }) return b } diff --git a/pkg/cluster/task/copy_file.go b/pkg/cluster/task/copy_file.go index dc85db8a2f..bd56a8088c 100644 --- a/pkg/cluster/task/copy_file.go +++ b/pkg/cluster/task/copy_file.go @@ -28,6 +28,7 @@ type CopyFile struct { remote string download bool limit int + compress bool } // Execute implements the Task interface @@ -37,7 +38,7 @@ func (c *CopyFile) Execute(ctx context.Context) error { return ErrNoExecutor } - err := e.Transfer(ctx, c.src, c.dst, c.download, c.limit) + err := e.Transfer(ctx, c.src, c.dst, c.download, c.limit, c.compress) if err != nil { return errors.Annotate(err, "failed to transfer file") } diff --git a/pkg/cluster/task/init_config_test.go b/pkg/cluster/task/init_config_test.go index 9833217880..b25450a873 100644 --- a/pkg/cluster/task/init_config_test.go +++ b/pkg/cluster/task/init_config_test.go @@ -37,7 +37,7 @@ func (e *fakeExecutor) Execute(ctx context.Context, cmd string, sudo bool, timeo return []byte{}, []byte{}, nil } -func (e *fakeExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error { +func (e *fakeExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error { return nil } diff --git a/pkg/cluster/task/install_package.go b/pkg/cluster/task/install_package.go index 9c14d67e4a..19d8aa5756 100644 --- a/pkg/cluster/task/install_package.go +++ b/pkg/cluster/task/install_package.go @@ -42,7 +42,7 @@ func (c *InstallPackage) Execute(ctx context.Context) error { dstDir := filepath.Join(c.dstDir, "bin") dstPath := filepath.Join(dstDir, path.Base(c.srcPath)) - err := exec.Transfer(ctx, c.srcPath, dstPath, false, 0) + err := exec.Transfer(ctx, c.srcPath, dstPath, false, 0, false) if err != nil { return errors.Annotatef(err, "failed to scp %s to %s:%s", c.srcPath, c.host, dstPath) } diff --git a/pkg/cluster/task/monitored_config.go b/pkg/cluster/task/monitored_config.go index 076115ee03..ca733b1149 100644 --- a/pkg/cluster/task/monitored_config.go +++ b/pkg/cluster/task/monitored_config.go @@ -113,7 +113,7 @@ func (m *MonitoredConfig) syncMonitoredSystemConfig(ctx context.Context, exec ct return err } tgt := filepath.Join("/tmp", comp+"_"+uuid.New().String()+".service") - if err := exec.Transfer(ctx, sysCfg, tgt, false, 0); err != nil { + if err := exec.Transfer(ctx, sysCfg, tgt, false, 0, false); err != nil { return err } if outp, errp, err := exec.Execute(ctx, fmt.Sprintf("mv %s /etc/systemd/system/%s-%d.service", tgt, comp, port), true); err != nil { @@ -134,7 +134,7 @@ func (m *MonitoredConfig) syncMonitoredScript(ctx context.Context, exec ctxt.Exe return err } dst := filepath.Join(m.paths.Deploy, "scripts", fmt.Sprintf("run_%s.sh", comp)) - if err := exec.Transfer(ctx, fp, dst, false, 0); err != nil { + if err := exec.Transfer(ctx, fp, dst, false, 0, false); err != nil { return err } if _, _, err := exec.Execute(ctx, "chmod +x "+dst, false); err != nil { @@ -150,7 +150,7 @@ func (m *MonitoredConfig) syncBlackboxConfig(ctx context.Context, exec ctxt.Exec return err } dst := filepath.Join(m.paths.Deploy, "conf", "blackbox.yml") - return exec.Transfer(ctx, fp, dst, false, 0) + return exec.Transfer(ctx, fp, dst, false, 0, false) } // Rollback implements the Task interface diff --git a/pkg/cluster/task/tls.go b/pkg/cluster/task/tls.go index 5006f2a764..367a7c1636 100644 --- a/pkg/cluster/task/tls.go +++ b/pkg/cluster/task/tls.go @@ -104,19 +104,22 @@ func (c *TLSCert) Execute(ctx context.Context) error { if err := e.Transfer(ctx, caFile, filepath.Join(c.paths.Deploy, "tls", spec.TLSCACert), false, /* download */ - 0 /* limit */); err != nil { + 0, /* limit */ + false /* compress */); err != nil { return errors.Annotate(err, "failed to transfer CA cert to server") } if err := e.Transfer(ctx, keyFile, filepath.Join(c.paths.Deploy, "tls", fmt.Sprintf("%s.pem", c.role)), false, /* download */ - 0 /* limit */); err != nil { + 0, /* limit */ + false /* compress */); err != nil { return errors.Annotate(err, "failed to transfer TLS private key to server") } if err := e.Transfer(ctx, certFile, filepath.Join(c.paths.Deploy, "tls", fmt.Sprintf("%s.crt", c.role)), false, /* download */ - 0 /* limit */); err != nil { + 0, /* limit */ + false /* compress */); err != nil { return errors.Annotate(err, "failed to transfer TLS cert to server") }