Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input file with list of commands to run async on nodes #75

Merged
merged 15 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 100 additions & 15 deletions commands/run.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,85 @@
package commands

import (
"bufio"
"context"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"

cli "github.com/urfave/cli"

"github.com/gxed/go-shellwords"
"github.com/ipfs/iptb/testbed"
"github.com/ipfs/iptb/testbed/interfaces"
)

var RunCmd = cli.Command{
Category: "CORE",
Name: "run",
Usage: "run command on specified nodes (or all)",
Usage: "concurrently run command(s) on specified nodes (or all)",
ArgsUsage: "[nodes] -- <command...>",
Description: `
Commands may also be passed in via stdin or a pipe, e.g. the command

$ iptb run 0 -- echo "Running on node 0"

can be equivalently written as

$ iptb run <<CMD
0 -- echo "Running on node 0"
CMD

or

$ echo '0 -- echo "Running on node 0"' | iptb run

All lines starting with '#' will be ignored, which allows for comments:

$ iptb run <<CMD
# print ipfs peers
0 -- ipfs swarm peers
CMD

Multiple commands may also be passed via stdin/pipe:

$ iptb run <<CMDS
0 -- echo "Running on node 0"
[0,1] -- echo "Running on nodes 0 and 1"
-- echo "Running on all nodes"
CMDS

Note that any single call to ` + "`iptb run`" + ` runs *all* commands concurrently. So,
in the above example, there is no guarantee as to the order in which the lines
are printed.
`,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "terminator",
Hidden: true,
},
cli.BoolFlag{
Name: "stdin",
Hidden: true,
},
},
Before: func(c *cli.Context) error {
if c.NArg() == 0 {
finfo, err := os.Stdin.Stat()
if err != nil {
return err
}
if finfo.Size() == 0 && finfo.Mode()&os.ModeNamedPipe == 0 {
return fmt.Errorf("error: no command input and stdin is empty")
}
return c.Set("stdin", "true")
}
if present := isTerminatorPresent(c); present {
return c.Set("terminator", "true")
}

return nil
},
Action: func(c *cli.Context) error {
Expand All @@ -39,26 +92,58 @@ var RunCmd = cli.Command{
return err
}

nodeRange, args := parseCommand(c.Args(), c.IsSet("terminator"))

if nodeRange == "" {
nodeRange = fmt.Sprintf("[0-%d]", len(nodes)-1)
var reader io.Reader
if c.IsSet("stdin") {
reader = bufio.NewReader(os.Stdin)
} else {
var builder strings.Builder
if c.IsSet("terminator") {
builder.WriteString("-- ")
}
for i, arg := range c.Args() {
builder.WriteString(strconv.Quote(arg))
if i != c.NArg()-1 {
builder.WriteString(" ")
}
}
reader = strings.NewReader(builder.String())
}

list, err := parseRange(nodeRange)
if err != nil {
return fmt.Errorf("could not parse node range %s", nodeRange)
var args [][]string
scanner := bufio.NewScanner(reader)
line := 1
for scanner.Scan() {
tokens, err := shellwords.Parse(scanner.Text())
if err != nil {
return fmt.Errorf("parse error on line %d: %s", line, err)
}
if strings.HasPrefix(tokens[0], "#") {
continue
}
args = append(args, tokens)
line++
}

runCmd := func(node testbedi.Core) (testbedi.Output, error) {
return node.RunCmd(context.Background(), nil, args...)
}
ranges := make([][]int, len(args))
runCmds := make([]outputFunc, len(args))
for i, cmd := range args {
nodeRange, tokens := parseCommand(cmd, false)
if nodeRange == "" {
nodeRange = fmt.Sprintf("[0-%d]", len(nodes)-1)
}
list, err := parseRange(nodeRange)
if err != nil {
return fmt.Errorf("could not parse node range %s", nodeRange)
}
ranges[i] = list

results, err := mapWithOutput(list, nodes, runCmd)
if err != nil {
return err
runCmd := func(node testbedi.Core) (testbedi.Output, error) {
return node.RunCmd(context.Background(), nil, tokens...)
}
runCmds[i] = runCmd
}

results, err := mapListWithOutput(ranges, nodes, runCmds)
return buildReport(results)
},
}
43 changes: 43 additions & 0 deletions commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func parseCommand(args []string, terminator bool) (string, []string) {
return args[0], []string{}
}

if args[0] == "--" {
return "", args[1:]
}

arguments := args[1:]

if arguments[0] == "--" {
Expand Down Expand Up @@ -128,6 +132,45 @@ type Result struct {

type outputFunc func(testbedi.Core) (testbedi.Output, error)

func mapListWithOutput(ranges [][]int, nodes []testbedi.Core, fns []outputFunc) ([]Result, error) {
var wg sync.WaitGroup
var lk sync.Mutex
var errs []error

total := 0
offsets := make([]int, len(ranges))
for i, list := range ranges {
offsets[i] = total
total += len(list)
}
results := make([]Result, total)

for i, list := range ranges {
wg.Add(1)
go func(i int, list []int) {
defer wg.Done()
results_i, err := mapWithOutput(list, nodes, fns[i])

lk.Lock()
defer lk.Unlock()

if err != nil {
errs = append(errs, err)
}
for j, result := range results_i {
results[offsets[i]+j] = result
}
}(i, list)
wg.Wait()
}

if len(errs) != 0 {
return results, cli.NewMultiError(errs...)
}

return results, nil
}

func mapWithOutput(list []int, nodes []testbedi.Core, fn outputFunc) ([]Result, error) {
var wg sync.WaitGroup
var lk sync.Mutex
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
"hash": "QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy",
"name": "errors",
"version": "0.0.1"
},
{
"author": "mattn",
"hash": "QmbgMitQCJBeteUr9jGMyTRFjhZQjXKW5Kif6UdK4aW6W6",
"name": "go-shellwords",
"version": "0.0.0"
}
],
"gxVersion": "0.6.0",
Expand Down