diff --git a/src/please.go b/src/please.go index d7bf98cafd..bfe376a6ff 100644 --- a/src/please.go +++ b/src/please.go @@ -428,7 +428,7 @@ var buildFunctions = map[string]func() bool{ }, "parallel": func() bool { if success, state := runBuild(opts.Run.Parallel.PositionalArgs.Targets, true, false); success { - os.Exit(run.Parallel(state, state.ExpandOriginalTargets(), opts.Run.Parallel.Args, opts.Run.Parallel.NumTasks, opts.Run.Parallel.Quiet, opts.Run.Env)) + os.Exit(run.Parallel(state, state.ExpandOriginalTargets(), opts.Run.Parallel.Args, opts.Run.Parallel.NumTasks, opts.Run.Parallel.Quiet, opts.Run.Env, nil)) } return false }, diff --git a/src/run/run_step.go b/src/run/run_step.go index d91c00f923..bdaf9ca14f 100644 --- a/src/run/run_step.go +++ b/src/run/run_step.go @@ -2,9 +2,11 @@ package run import ( + "bytes" "fmt" "os" "os/exec" + "os/signal" "strings" "sync" "syscall" @@ -21,22 +23,53 @@ var log = logging.MustGetLogger("run") // Run implements the running part of 'plz run'. func Run(state *core.BuildState, label core.BuildLabel, args []string, env bool) { - run(state, label, args, false, false, env) + run(state, label, args, false, false, env, nil, nil) } // Parallel runs a series of targets in parallel. // Returns a relevant exit code (i.e. if at least one subprocess exited unsuccessfully, it will be // that code, otherwise 0 if all were successful). -func Parallel(state *core.BuildState, labels []core.BuildLabel, args []string, numTasks int, quiet, env bool) int { +func Parallel(state *core.BuildState, labels []core.BuildLabel, args []string, numTasks int, quiet, env bool, killchan <-chan bool) int { pool := NewGoroutinePool(numTasks) var g errgroup.Group + + pidchan := make(chan int, len(labels)) + sigchan := make(chan os.Signal, 1) + // Get notified on SIGINT. Stop listening once this function ends. + signal.Notify(sigchan, syscall.SIGINT) + defer func() { + signal.Stop(sigchan) + close(sigchan) + close(pidchan) + }() + + // Start a goroutine to listen for SIGINT, and cancel any running targets if it's received. + go func(){ + for range sigchan { + // Got SIGINT - Kill all of the targets, then forward the signal to ourselves. + log.Warning("Got SIGINT; killing running targets") + killRunningTargets(pidchan) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + } + }() + + // Listen for a signal to kill all running targets. + if killchan != nil { + go func() { + for range killchan { + killRunningTargets(pidchan) + } + }() + } + + // Run the targets. for _, label := range labels { label := label // capture locally g.Go(func() (err error) { var wg sync.WaitGroup wg.Add(1) pool.Submit(func() { - if e := run(state, label, args, true, quiet, env); e != nil { + if e := run(state, label, args, true, quiet, env, pidchan, killchan); e != nil { err = e } wg.Done() @@ -56,9 +89,29 @@ func Parallel(state *core.BuildState, labels []core.BuildLabel, args []string, n // Returns a relevant exit code (i.e. if at least one subprocess exited unsuccessfully, it will be // that code, otherwise 0 if all were successful). func Sequential(state *core.BuildState, labels []core.BuildLabel, args []string, quiet, env bool) int { + pidchan := make(chan int, len(labels)) + sigchan := make(chan os.Signal, 1) + + // Get notified on SIGINT. Stop listening once this function ends. + signal.Notify(sigchan, syscall.SIGINT) + defer func() { + signal.Stop(sigchan) + close(sigchan) + close(pidchan) + }() + + // Start a goroutine to listen for SIGINT, and cancel any running targets if it's received. + go func(){ + for range sigchan { + // Got SIGINT - Kill all of the targets, then forward the signal to ourselves. + log.Warning("Got SIGINT; killing running targets") + killRunningTargets(pidchan) + } + }() + for _, label := range labels { log.Notice("Running %s", label) - if err := run(state, label, args, true, quiet, env); err != nil { + if err := run(state, label, args, true, quiet, env, pidchan, nil); err != nil { log.Error("%s", err) return err.code } @@ -70,7 +123,7 @@ func Sequential(state *core.BuildState, labels []core.BuildLabel, args []string, // If fork is true then we fork to run the target and return any error from the subprocesses. // If it's false this function never returns (because we either win or die; it's like // Game of Thrones except rather less glamorous). -func run(state *core.BuildState, label core.BuildLabel, args []string, fork, quiet, setenv bool) *exitError { +func run(state *core.BuildState, label core.BuildLabel, args []string, fork, quiet, setenv bool, pidchan chan<- int, killchan <-chan bool) *exitError { target := state.Graph.TargetOrDie(label) if !target.IsBinary { log.Fatalf("Target %s cannot be run; it's not marked as binary", label) @@ -103,15 +156,29 @@ func run(state *core.BuildState, label core.BuildLabel, args []string, fork, qui // Note that we don't connect stdin. It doesn't make sense for multiple processes. cmd := core.ExecCommand(splitCmd[0], args[1:]...) // args here don't include argv[0] cmd.Env = env - if !quiet { - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - must(cmd.Start(), args) - err := cmd.Wait() - return toExitError(err, cmd, nil) - } - out, err := cmd.CombinedOutput() - return toExitError(err, cmd, out) + + var combinedOutput bytes.Buffer // Dump the command output here, for quiet mode. + if quiet { + cmd.Stdout, cmd.Stderr = &combinedOutput, &combinedOutput + } else { + cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr + } + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + must(cmd.Start(), args) + if pidchan != nil { + pidchan <- cmd.Process.Pid + } + + err := cmd.Wait() + return toExitError(err, cmd, combinedOutput.Bytes()) +} + +func killRunningTargets(pids <-chan int) { + for pid := range pids { + if groupID, err := syscall.Getpgid(pid); err == nil { + syscall.Kill(-groupID, syscall.SIGKILL) + } + } } // environ returns an appropriate environment for a command. diff --git a/src/run/run_test.go b/src/run/run_test.go index 8e03dce6c5..e17f1987cc 100644 --- a/src/run/run_test.go +++ b/src/run/run_test.go @@ -25,9 +25,9 @@ func TestSequential(t *testing.T) { func TestParallel(t *testing.T) { state, labels1, labels2 := makeState() - code := Parallel(state, labels1, nil, 5, false, false) + code := Parallel(state, labels1, nil, 5, false, false, nil) assert.Equal(t, 0, code) - code = Parallel(state, labels2, nil, 5, true, false) + code = Parallel(state, labels2, nil, 5, true, false, nil) assert.Equal(t, 1, code) } diff --git a/src/watch/watch.go b/src/watch/watch.go index be8531c926..de5c01876b 100644 --- a/src/watch/watch.go +++ b/src/watch/watch.go @@ -38,10 +38,14 @@ func Watch(state *core.BuildState, labels core.BuildLabels, callback CallbackFun files := cmap.New() go startWatching(watcher, state, labels, files) + // We'll send a message on this channel when we're about to start a build, to indicate that any + // targets currently running should be killed. + killchan := make(chan bool, 1) + // The initial setup only builds targets, it doesn't test or run things. // Do one of those now if requested. if state.NeedTests || state.NeedRun { - build(state, labels, callback) + build(state, labels, callback, killchan) } for { @@ -52,6 +56,14 @@ func Watch(state *core.BuildState, labels core.BuildLabels, callback CallbackFun log.Notice("Skipping notification for %s", event.Name) continue } + // Stop any running targets. + if state.NeedTests || state.NeedRun { + log.Warning("Received update notification; killing running targets before " + + "starting new ones") + killchan <- true + close(killchan) + killchan = make(chan bool, 1) + } // Quick debounce; poll and discard all events for the next brief period. outer: @@ -62,7 +74,7 @@ func Watch(state *core.BuildState, labels core.BuildLabels, callback CallbackFun break outer } } - build(state, labels, callback) + build(state, labels, callback, killchan) case err := <-watcher.Errors: log.Error("Error watching files:", err) } @@ -141,7 +153,7 @@ func anyTests(state *core.BuildState, labels []core.BuildLabel) bool { } // build invokes a single build while watching. -func build(state *core.BuildState, labels []core.BuildLabel, callback CallbackFunc) { +func build(state *core.BuildState, labels []core.BuildLabel, callback CallbackFunc, killchan <-chan bool) { // Set up a new state & copy relevant parts off the existing one. ns := core.NewBuildState(state.Config.Please.NumThreads, state.Cache, state.Verbosity, state.Config) ns.VerifyHashes = state.VerifyHashes @@ -155,6 +167,6 @@ func build(state *core.BuildState, labels []core.BuildLabel, callback CallbackFu ns.StartTime = time.Now() callback(ns, labels) if state.NeedRun { - run.Parallel(state, labels, nil, state.Config.Please.NumThreads, false, false) + go run.Parallel(state, labels, nil, state.Config.Please.NumThreads, false, false, killchan) } }