Skip to content

Commit

Permalink
cluster: add compress option to scp (#1623)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Nov 16, 2021
1 parent 4f09e34 commit 2afffed
Show file tree
Hide file tree
Showing 31 changed files with 86 additions and 67 deletions.
1 change: 1 addition & 0 deletions components/cluster/command/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func newPullCmd() *cobra.Command {
cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only exec on host with specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only exec on host with specified nodes")
cmd.Flags().IntVarP(&opt.Limit, "limit", "l", 0, "Limits the used bandwidth, specified in Kbit/s")
cmd.Flags().BoolVar(&opt.Compress, "compress", false, "Compression enable. Passes the -C flag to ssh(1) to enable compression.")

return cmd
}
Expand Down
4 changes: 2 additions & 2 deletions components/dm/ansible/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (im *Importer) fetchFile(ctx context.Context, host string, port int, fname

tmp = filepath.Join(tmp, filepath.Base(fname))

err = e.Transfer(ctx, fname, tmp, true /*download*/, 0)
err = e.Transfer(ctx, fname, tmp, true /*download*/, 0, false)
if err != nil {
return nil, errors.Annotatef(err, "transfer %s from %s:%d", fname, host, port)
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (im *Importer) ScpSourceToMaster(ctx context.Context, topo *spec.Specificat
return errors.AddStack(err)
}

err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false, 0)
err = e.Transfer(ctx, f.Name(), filepath.Join(target, addr+".yml"), false, 0, false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions components/dm/ansible/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func (g *executorGetter) Get(host string) ctxt.Executor {

// Transfer implements executor interface.
// Replace the deploy directory as the local one in testdata, so we can fetch it.
func (l *localExecutor) Transfer(ctx context.Context, src, target string, download bool, limit int) error {
func (l *localExecutor) Transfer(ctx context.Context, src, target string, download bool, limit int, _ bool) error {
mydeploy, err := filepath.Abs("./testdata/deploy_dir/" + l.host)
if err != nil {
return errors.AddStack(err)
}
src = strings.Replace(src, "/home/tidb/deploy", mydeploy, 1)
return l.Local.Transfer(ctx, src, target, download, 0)
return l.Local.Transfer(ctx, src, target, download, 0, false)
}

func TestParseRunScript(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (i *MasterInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -177,7 +177,7 @@ func (i *MasterInstance) ScaleConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_dm-master.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -266,7 +266,7 @@ func (i *WorkerInstance) InitConfig(
}
dst := filepath.Join(paths.Deploy, "scripts", "run_dm-worker.sh")

if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/cluster/ansible/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, gOpt operator.Options)
inst.GetPort())),
inst.GetHost(),
true,
0).
0,
false).
Build()
copyFileTasks = append(copyFileTasks, t)
case spec.ComponentTiFlash:
Expand Down Expand Up @@ -113,7 +114,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, gOpt operator.Options)
inst.GetPort())),
inst.GetHost(),
true,
0).
0,
false).
CopyFile(filepath.Join(inst.DeployDir(), "conf", inst.ComponentName()+"-learner.toml"),
spec.ClusterPath(name,
spec.AnsibleImportedConfigPath,
Expand All @@ -123,7 +125,8 @@ func ImportConfig(name string, clsMeta *spec.ClusterMeta, gOpt operator.Options)
inst.GetPort())),
inst.GetHost(),
true,
0).
0,
false).
Build()
copyFileTasks = append(copyFileTasks, t)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/ctxt/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type (
Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error)

// Transfer copies files from or to a target
Transfer(ctx context.Context, src, dst string, download bool, limit int) error
Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error
}

// ExecutorGetter get the executor by host.
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/executor/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *CheckPointExecutor) Execute(ctx context.Context, cmd string, sudo bool,
}

// Transfer implements Executer interface.
func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) (err error) {
func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) (err error) {
point := checkpoint.Acquire(ctx, scpPoint, map[string]interface{}{
"host": c.config.Host,
"port": c.config.Port,
Expand All @@ -95,6 +95,7 @@ func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, down
"dst": dst,
"download": download,
"limit": limit,
"compress": compress,
})
defer func() {
point.Release(err,
Expand All @@ -109,5 +110,5 @@ func (c *CheckPointExecutor) Transfer(ctx context.Context, src, dst string, down
return nil
}

return c.Executor.Transfer(ctx, src, dst, download, limit)
return c.Executor.Transfer(ctx, src, dst, download, limit, compress)
}
2 changes: 1 addition & 1 deletion pkg/cluster/executor/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (l *Local) Execute(ctx context.Context, cmd string, sudo bool, timeout ...t
}

// Transfer implements Executer interface.
func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
func (l *Local) Transfer(ctx context.Context, src, dst string, download bool, limit int, _ bool) error {
targetPath := filepath.Dir(dst)
if err := utils.CreateDir(targetPath); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/executor/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestLocal(t *testing.T) {
defer os.Remove(dst.Name())

// Transfer src to dst and check it.
err = local.Transfer(ctx, src.Name(), dst.Name(), false, 0)
err = local.Transfer(ctx, src.Name(), dst.Name(), false, 0, false)
assert.Nil(err)

data, err := os.ReadFile(dst.Name())
Expand Down
11 changes: 8 additions & 3 deletions pkg/cluster/executor/scp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// ScpDownload downloads a file from remote with SCP
// The implementation is partially inspired by github.com/dtylman/scp
func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limit int) error {
func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limit int, compress bool) error {
r, err := session.StdoutPipe()
if err != nil {
return err
Expand Down Expand Up @@ -97,10 +97,15 @@ func ScpDownload(session *ssh.Session, client *ssh.Client, src, dst string, limi
copyErrC <- copyF()
}()

remoteCmd := fmt.Sprintf("scp -f %s", src)
remoteArgs := make([]string, 0)
if compress {
remoteArgs = append(remoteArgs, "-C")
}
if limit > 0 {
remoteCmd = fmt.Sprintf("scp -l %d -f %s", limit, src)
remoteArgs = append(remoteArgs, fmt.Sprintf("-l %d", limit))
}
remoteCmd := fmt.Sprintf("scp %s -f %s", strings.Join(remoteArgs, " "), src)

err = session.Start(remoteCmd)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions pkg/cluster/executor/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (e *EasySSHExecutor) Execute(ctx context.Context, cmd string, sudo bool, ti
// This function depends on `scp` (a tool from OpenSSH or other SSH implementation)
// This function is based on easyssh.MakeConfig.Scp() but with support of copying
// file from remote to local.
func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error {
if !download {
err := e.Config.Scp(src, dst)
if err != nil {
Expand All @@ -217,7 +217,7 @@ func (e *EasySSHExecutor) Transfer(ctx context.Context, src, dst string, downloa
defer client.Close()
defer session.Close()

return ScpDownload(session, client, src, dst, limit)
return ScpDownload(session, client, src, dst, limit, compress)
}

func (e *NativeSSHExecutor) prompt(def string) string {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (e *NativeSSHExecutor) Execute(ctx context.Context, cmd string, sudo bool,

// Transfer copies files via SCP
// This function depends on `scp` (a tool from OpenSSH or other SSH implementation)
func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int) error {
func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error {
if e.ConnectionTestResult != nil {
return e.ConnectionTestResult
}
Expand All @@ -372,6 +372,9 @@ func (e *NativeSSHExecutor) Transfer(ctx context.Context, src, dst string, downl
if limit > 0 {
args = append(args, "-l", fmt.Sprint(limit))
}
if compress {
args = append(args, "-C")
}
args = e.configArgs(args, true) // prefix and postfix args

if download {
Expand Down
13 changes: 7 additions & 6 deletions pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import (

// TransferOptions for exec shell commanm.
type TransferOptions struct {
Local string
Remote string
Pull bool // default to push
Limit int // rate limit in Kbit/s
Local string
Remote string
Pull bool // default to push
Limit int // rate limit in Kbit/s
Compress bool // enable compress
}

// Transfer copies files from or to host in the tidb cluster.
Expand Down Expand Up @@ -94,9 +95,9 @@ func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Optio
for _, p := range i.Slice() {
t := task.NewBuilder(gOpt.DisplayMode)
if opt.Pull {
t.CopyFile(p, srcPath, host, opt.Pull, opt.Limit)
t.CopyFile(p, srcPath, host, opt.Pull, opt.Limit, opt.Compress)
} else {
t.CopyFile(srcPath, p, host, opt.Pull, opt.Limit)
t.CopyFile(srcPath, p, host, opt.Pull, opt.Limit, opt.Compress)
}
shellTasks = append(shellTasks, t.Build())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func GetNodeInfo(

dstDir := filepath.Join(dir, "bin")
dstPath := filepath.Join(dstDir, path.Base(srcPath))
err = exec.Transfer(nctx, srcPath, dstPath, false, 0)
err = exec.Transfer(nctx, srcPath, dstPath, false, 0, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (i *AlertManagerInstance) InitConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_alertmanager.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (i *CDCInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_cdc.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (i *DrainerInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_drainer.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/spec/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (i *GrafanaInstance) InitConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_grafana.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand All @@ -176,7 +176,7 @@ func (i *GrafanaInstance) InitConfig(
return err
}
dst = filepath.Join(paths.Deploy, "conf", "grafana.ini")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand Down Expand Up @@ -277,7 +277,7 @@ func (i *GrafanaInstance) installDashboards(ctx context.Context, e ctxt.Executor

srcPath := PackagePath(ComponentDMMaster, clusterVersion, i.OS(), i.Arch())
dstPath := filepath.Join(tmp, filepath.Base(srcPath))
err = e.Transfer(ctx, srcPath, dstPath, false, 0)
err = e.Transfer(ctx, srcPath, dstPath, false, 0, false)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (i *BaseInstance) InitConfig(ctx context.Context, e ctxt.Executor, opt Glob
return errors.Trace(err)
}
tgt := filepath.Join("/tmp", comp+"_"+uuid.New().String()+".service")
if err := e.Transfer(ctx, sysCfg, tgt, false, 0); err != nil {
if err := e.Transfer(ctx, sysCfg, tgt, false, 0, false); err != nil {
return errors.Annotatef(err, "transfer from %s to %s failed", sysCfg, tgt)
}
cmd := fmt.Sprintf("mv %s /etc/systemd/system/%s-%d.service", tgt, comp, port)
Expand All @@ -203,7 +203,7 @@ func (i *BaseInstance) TransferLocalConfigFile(ctx context.Context, e ctxt.Execu
return errors.Annotatef(err, "execute: %s", cmd)
}

if err := e.Transfer(ctx, local, remote, false, 0); err != nil {
if err := e.Transfer(ctx, local, remote, false, 0, false); err != nil {
return errors.Annotatef(err, "transfer from %s to %s failed", local, remote)
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (i *BaseInstance) MergeServerConfig(ctx context.Context, e ctxt.Executor, g
}
dst := filepath.Join(paths.Deploy, "conf", fmt.Sprintf("%s.toml", i.ComponentName()))
// transfer config
return e.Transfer(ctx, fp, dst, false, 0)
return e.Transfer(ctx, fp, dst, false, 0, false)
}

// mergeTiFlashLearnerServerConfig merges the server configuration and overwrite the global configuration
Expand All @@ -271,7 +271,7 @@ func (i *BaseInstance) mergeTiFlashLearnerServerConfig(ctx context.Context, e ct
}
dst := filepath.Join(paths.Deploy, "conf", fmt.Sprintf("%s-learner.toml", i.ComponentName()))
// transfer config
return e.Transfer(ctx, fp, dst, false, 0)
return e.Transfer(ctx, fp, dst, false, 0, false)
}

// ID returns the identifier of this instance, the ID is constructed by host:port
Expand Down
8 changes: 4 additions & 4 deletions pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (i *MonitorInstance) InitConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_prometheus.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand Down Expand Up @@ -328,7 +328,7 @@ func (i *MonitorInstance) InitConfig(
return err
}
dst = filepath.Join(paths.Deploy, "conf", "ngmonitoring.toml")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand All @@ -337,7 +337,7 @@ func (i *MonitorInstance) InitConfig(
return err
}
dst = filepath.Join(paths.Deploy, "conf", "prometheus.yml")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}

Expand All @@ -363,7 +363,7 @@ func (i *MonitorInstance) installRules(ctx context.Context, e ctxt.Executor, dep
srcPath := PackagePath(ComponentDMMaster, clusterVersion, i.OS(), i.Arch())
dstPath := filepath.Join(tmp, filepath.Base(srcPath))

err = e.Transfer(ctx, srcPath, dstPath, false, 0)
err = e.Transfer(ctx, srcPath, dstPath, false, 0, false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (i *PDInstance) InitConfig(
return err
}
dst := filepath.Join(paths.Deploy, "scripts", "run_pd.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down Expand Up @@ -292,7 +292,7 @@ func (i *PDInstance) ScaleConfig(
}

dst := filepath.Join(paths.Deploy, "scripts", "run_pd.sh")
if err := e.Transfer(ctx, fp, dst, false, 0); err != nil {
if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil {
return err
}
if _, _, err := e.Execute(ctx, "chmod +x "+dst, false); err != nil {
Expand Down
Loading

0 comments on commit 2afffed

Please sign in to comment.