Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: add subcommand pull and push to transfer files #1044

Merged
merged 7 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions components/cluster/command/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
func newExecCmd() *cobra.Command {
opt := manager.ExecOptions{}
cmd := &cobra.Command{
Use: "exec <cluster-name>",
Short: "Run shell command on host in the tidb cluster",
Use: "exec <cluster-name>",
Short: "Run shell command on host in the tidb cluster",
Hidden: true,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmd.Help()
Expand Down
4 changes: 3 additions & 1 deletion components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func init() {
newDestroyCmd(),
newCleanCmd(),
newUpgradeCmd(),
newExecCmd(),
newDisplayCmd(),
newPruneCmd(),
newListCmd(),
Expand All @@ -161,6 +160,9 @@ func init() {
newRenameCmd(),
newEnableCmd(),
newDisableCmd(),
newExecCmd(),
newPullCmd(),
newPushCmd(),
newTestCmd(), // hidden command for test internally
newTelemetryCmd(),
)
Expand Down
78 changes: 78 additions & 0 deletions components/cluster/command/transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"github.com/pingcap/tiup/pkg/cluster/manager"
"github.com/spf13/cobra"
)

/* Add a pair of adb like commands to transfer files to or from remote
servers. Not using `scp` as the real implementation is not necessarily
SSH, not using `transfer` all-in-one command to get rid of complex
checking of wheather a path is remote or local, as this is supposed
to be only a tiny helper utility.
*/

func newPullCmd() *cobra.Command {
opt := manager.TransferOptions{Pull: true}
cmd := &cobra.Command{
Use: "pull <cluster-name> <remote-path> <local-path>",
Short: "(EXPERIMENTAL) Transfer files or directories from host in the tidb cluster to local",
Hidden: true,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 3 {
return cmd.Help()
}

clusterName := args[0]
opt.Remote = args[1]
opt.Local = args[2]
teleCommand = append(teleCommand, scrubClusterName(clusterName))

return cm.Transfer(clusterName, opt, gOpt)
},
}

cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only exec on host with specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only exec on host with specified nodes")

return cmd
}

func newPushCmd() *cobra.Command {
opt := manager.TransferOptions{Pull: false}
cmd := &cobra.Command{
Use: "push <cluster-name> <local-path> <remote-path>",
Short: "(EXPERIMENTAL) Transfer files or directories from local to host in the tidb cluster",
Hidden: true,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 3 {
return cmd.Help()
}

clusterName := args[0]
opt.Local = args[1]
opt.Remote = args[2]
teleCommand = append(teleCommand, scrubClusterName(clusterName))

return cm.Transfer(clusterName, opt, gOpt)
},
}

cmd.Flags().StringSliceVarP(&gOpt.Roles, "role", "R", nil, "Only exec on host with specified roles")
cmd.Flags().StringSliceVarP(&gOpt.Nodes, "node", "N", nil, "Only exec on host with specified nodes")

return cmd
}
5 changes: 3 additions & 2 deletions components/dm/command/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
func newExecCmd() *cobra.Command {
opt := manager.ExecOptions{}
cmd := &cobra.Command{
Use: "exec <cluster-name>",
Short: "Run shell command on host in the dm cluster",
Use: "exec <cluster-name>",
Short: "Run shell command on host in the dm cluster",
Hidden: true,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return cmd.Help()
Expand Down
145 changes: 145 additions & 0 deletions pkg/cluster/manager/transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package manager

import (
"bytes"
"fmt"
"html/template"
"strings"

"github.com/joomcode/errorx"
perrs "github.com/pingcap/errors"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/cluster/task"
"github.com/pingcap/tiup/pkg/set"
)

// TransferOptions for exec shell commanm.
type TransferOptions struct {
Local string
Remote string
Pull bool // default to push
}

// Transfer copies files from or to host in the tidb cluster.
func (m *Manager) Transfer(name string, opt TransferOptions, gOpt operator.Options) error {
metadata, err := m.meta(name)
if err != nil {
return err
}

topo := metadata.GetTopology()
base := metadata.GetBaseMeta()

filterRoles := set.NewStringSet(gOpt.Roles...)
filterNodes := set.NewStringSet(gOpt.Nodes...)

var shellTasks []task.Task

uniqueHosts := map[string]set.StringSet{} // host-sshPort-port -> {remote-path}
topo.IterInstance(func(inst spec.Instance) {
key := fmt.Sprintf("%s-%d-%d", inst.GetHost(), inst.GetSSHPort(), inst.GetPort())
if _, found := uniqueHosts[key]; !found {
if len(gOpt.Roles) > 0 && !filterRoles.Exist(inst.Role()) {
return
}

if len(gOpt.Nodes) > 0 && !filterNodes.Exist(inst.GetHost()) {
return
}

// render remote path
instPath := opt.Remote
paths, err := renderInstanceSpec(instPath, inst)
if err != nil {
return // skip
}
pathSet := set.NewStringSet(paths...)
if _, ok := uniqueHosts[key]; ok {
uniqueHosts[key].Join(pathSet)
return
}
uniqueHosts[key] = pathSet
}
})

srcPath := opt.Local
for hostKey, i := range uniqueHosts {
host := strings.Split(hostKey, "-")[0]
for _, p := range i.Slice() {
t := task.NewBuilder()
if opt.Pull {
t.CopyFile(p, srcPath, host, opt.Pull)
} else {
t.CopyFile(srcPath, p, host, opt.Pull)
}
shellTasks = append(shellTasks, t.Build())
}
}

t := m.sshTaskBuilder(name, topo, base.User, gOpt).
Parallel(false, shellTasks...).
Build()

execCtx := task.NewContext()
if err := t.Execute(execCtx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
}
return perrs.Trace(err)
}

return nil
}

func renderInstanceSpec(t string, inst spec.Instance) ([]string, error) {
result := make([]string, 0)
switch inst.ComponentName() {
case spec.ComponentTiFlash:
for _, d := range strings.Split(inst.DataDir(), ",") {
tf := inst
tfs, ok := tf.(*spec.TiFlashInstance).InstanceSpec.(spec.TiFlashSpec)
if !ok {
return result, fmt.Errorf("instance type mismatch for %v", inst)
}
tfs.DataDir = d
if s, err := renderSpec(t, tfs, inst.ID()+d); err == nil {
result = append(result, s)
}
}
default:
s, err := renderSpec(t, inst, inst.ID())
if err != nil {
return result, fmt.Errorf("error rendering path for instance %v", inst)
}
result = append(result, s)
}
return result, nil
}

func renderSpec(t string, s interface{}, id string) (string, error) {
tpl, err := template.New(id).Option("missingkey=error").Parse(t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have write a playground, this should work: https://play.golang.org/p/ZP1fQgRmkFM

if err != nil {
return "", err
}

result := bytes.NewBufferString("")
if err := tpl.Execute(result, s); err != nil {
return "", err
}
return result.String(), nil
}