Skip to content

Commit

Permalink
Merge pull request #75 from dgrisham/feat/async-run-list
Browse files Browse the repository at this point in the history
Input file with list of commands to run async on nodes
  • Loading branch information
dgrisham authored Sep 14, 2018
2 parents 3846dce + d4685c2 commit 7928974
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 15 deletions.
115 changes: 100 additions & 15 deletions commands/run.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,85 @@
package commands

import (
"bufio"
"context"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"

cli "github.com/urfave/cli"

"github.com/gxed/go-shellwords"
"github.com/ipfs/iptb/testbed"
"github.com/ipfs/iptb/testbed/interfaces"
)

var RunCmd = cli.Command{
Category: "CORE",
Name: "run",
Usage: "run command on specified nodes (or all)",
Usage: "concurrently run command(s) on specified nodes (or all)",
ArgsUsage: "[nodes] -- <command...>",
Description: `
Commands may also be passed in via stdin or a pipe, e.g. the command
$ iptb run 0 -- echo "Running on node 0"
can be equivalently written as
$ iptb run <<CMD
0 -- echo "Running on node 0"
CMD
or
$ echo '0 -- echo "Running on node 0"' | iptb run
All lines starting with '#' will be ignored, which allows for comments:
$ iptb run <<CMD
# print ipfs peers
0 -- ipfs swarm peers
CMD
Multiple commands may also be passed via stdin/pipe:
$ iptb run <<CMDS
0 -- echo "Running on node 0"
[0,1] -- echo "Running on nodes 0 and 1"
-- echo "Running on all nodes"
CMDS
Note that any single call to ` + "`iptb run`" + ` runs *all* commands concurrently. So,
in the above example, there is no guarantee as to the order in which the lines
are printed.
`,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "terminator",
Hidden: true,
},
cli.BoolFlag{
Name: "stdin",
Hidden: true,
},
},
Before: func(c *cli.Context) error {
if c.NArg() == 0 {
finfo, err := os.Stdin.Stat()
if err != nil {
return err
}
if finfo.Size() == 0 && finfo.Mode()&os.ModeNamedPipe == 0 {
return fmt.Errorf("error: no command input and stdin is empty")
}
return c.Set("stdin", "true")
}
if present := isTerminatorPresent(c); present {
return c.Set("terminator", "true")
}

return nil
},
Action: func(c *cli.Context) error {
Expand All @@ -39,26 +92,58 @@ var RunCmd = cli.Command{
return err
}

nodeRange, args := parseCommand(c.Args(), c.IsSet("terminator"))

if nodeRange == "" {
nodeRange = fmt.Sprintf("[0-%d]", len(nodes)-1)
var reader io.Reader
if c.IsSet("stdin") {
reader = bufio.NewReader(os.Stdin)
} else {
var builder strings.Builder
if c.IsSet("terminator") {
builder.WriteString("-- ")
}
for i, arg := range c.Args() {
builder.WriteString(strconv.Quote(arg))
if i != c.NArg()-1 {
builder.WriteString(" ")
}
}
reader = strings.NewReader(builder.String())
}

list, err := parseRange(nodeRange)
if err != nil {
return fmt.Errorf("could not parse node range %s", nodeRange)
var args [][]string
scanner := bufio.NewScanner(reader)
line := 1
for scanner.Scan() {
tokens, err := shellwords.Parse(scanner.Text())
if err != nil {
return fmt.Errorf("parse error on line %d: %s", line, err)
}
if strings.HasPrefix(tokens[0], "#") {
continue
}
args = append(args, tokens)
line++
}

runCmd := func(node testbedi.Core) (testbedi.Output, error) {
return node.RunCmd(context.Background(), nil, args...)
}
ranges := make([][]int, len(args))
runCmds := make([]outputFunc, len(args))
for i, cmd := range args {
nodeRange, tokens := parseCommand(cmd, false)
if nodeRange == "" {
nodeRange = fmt.Sprintf("[0-%d]", len(nodes)-1)
}
list, err := parseRange(nodeRange)
if err != nil {
return fmt.Errorf("could not parse node range %s", nodeRange)
}
ranges[i] = list

results, err := mapWithOutput(list, nodes, runCmd)
if err != nil {
return err
runCmd := func(node testbedi.Core) (testbedi.Output, error) {
return node.RunCmd(context.Background(), nil, tokens...)
}
runCmds[i] = runCmd
}

results, err := mapListWithOutput(ranges, nodes, runCmds)
return buildReport(results)
},
}
43 changes: 43 additions & 0 deletions commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func parseCommand(args []string, terminator bool) (string, []string) {
return args[0], []string{}
}

if args[0] == "--" {
return "", args[1:]
}

arguments := args[1:]

if arguments[0] == "--" {
Expand Down Expand Up @@ -128,6 +132,45 @@ type Result struct {

type outputFunc func(testbedi.Core) (testbedi.Output, error)

func mapListWithOutput(ranges [][]int, nodes []testbedi.Core, fns []outputFunc) ([]Result, error) {
var wg sync.WaitGroup
var lk sync.Mutex
var errs []error

total := 0
offsets := make([]int, len(ranges))
for i, list := range ranges {
offsets[i] = total
total += len(list)
}
results := make([]Result, total)

for i, list := range ranges {
wg.Add(1)
go func(i int, list []int) {
defer wg.Done()
results_i, err := mapWithOutput(list, nodes, fns[i])

lk.Lock()
defer lk.Unlock()

if err != nil {
errs = append(errs, err)
}
for j, result := range results_i {
results[offsets[i]+j] = result
}
}(i, list)
wg.Wait()
}

if len(errs) != 0 {
return results, cli.NewMultiError(errs...)
}

return results, nil
}

func mapWithOutput(list []int, nodes []testbedi.Core, fn outputFunc) ([]Result, error) {
var wg sync.WaitGroup
var lk sync.Mutex
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
"hash": "QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy",
"name": "errors",
"version": "0.0.1"
},
{
"author": "mattn",
"hash": "QmbgMitQCJBeteUr9jGMyTRFjhZQjXKW5Kif6UdK4aW6W6",
"name": "go-shellwords",
"version": "0.0.0"
}
],
"gxVersion": "0.6.0",
Expand Down

0 comments on commit 7928974

Please sign in to comment.