Skip to content

Commit

Permalink
Multiple improvements to "plz watch" and "plz run".
Browse files Browse the repository at this point in the history
- Kill all targets started with Parallel or Sequential (including watch --run) on SIGINT
- Don't block "watch" on running targets. Instead, restart targets on notification
  • Loading branch information
katzdm committed Apr 18, 2019
1 parent b8f7ea8 commit fd40a1b
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/please.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ var buildFunctions = map[string]func() bool{
},
"parallel": func() bool {
if success, state := runBuild(opts.Run.Parallel.PositionalArgs.Targets, true, false); success {
os.Exit(run.Parallel(state, state.ExpandOriginalTargets(), opts.Run.Parallel.Args, opts.Run.Parallel.NumTasks, opts.Run.Parallel.Quiet, opts.Run.Env))
os.Exit(run.Parallel(state, state.ExpandOriginalTargets(), opts.Run.Parallel.Args, opts.Run.Parallel.NumTasks, opts.Run.Parallel.Quiet, opts.Run.Env, nil))
}
return false
},
Expand Down
95 changes: 81 additions & 14 deletions src/run/run_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
package run

import (
"bytes"
"fmt"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
Expand All @@ -21,22 +23,53 @@ var log = logging.MustGetLogger("run")

// Run implements the running part of 'plz run'.
func Run(state *core.BuildState, label core.BuildLabel, args []string, env bool) {
run(state, label, args, false, false, env)
run(state, label, args, false, false, env, nil, nil)
}

// Parallel runs a series of targets in parallel.
// Returns a relevant exit code (i.e. if at least one subprocess exited unsuccessfully, it will be
// that code, otherwise 0 if all were successful).
func Parallel(state *core.BuildState, labels []core.BuildLabel, args []string, numTasks int, quiet, env bool) int {
func Parallel(state *core.BuildState, labels []core.BuildLabel, args []string, numTasks int, quiet, env bool, killchan <-chan bool) int {
pool := NewGoroutinePool(numTasks)
var g errgroup.Group

pidchan := make(chan int, len(labels))
sigchan := make(chan os.Signal, 1)
// Get notified on SIGINT. Stop listening once this function ends.
signal.Notify(sigchan, syscall.SIGINT)
defer func() {
signal.Stop(sigchan)
close(sigchan)
close(pidchan)
}()

// Start a goroutine to listen for SIGINT, and cancel any running targets if it's received.
go func(){
for range sigchan {
// Got SIGINT - Kill all of the targets, then forward the signal to ourselves.
log.Warning("Got SIGINT; killing running targets")
killRunningTargets(pidchan)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
}()

// Listen for a signal to kill all running targets.
if killchan != nil {
go func() {
for range killchan {
killRunningTargets(pidchan)
}
}()
}

// Run the targets.
for _, label := range labels {
label := label // capture locally
g.Go(func() (err error) {
var wg sync.WaitGroup
wg.Add(1)
pool.Submit(func() {
if e := run(state, label, args, true, quiet, env); e != nil {
if e := run(state, label, args, true, quiet, env, pidchan, killchan); e != nil {
err = e
}
wg.Done()
Expand All @@ -56,9 +89,29 @@ func Parallel(state *core.BuildState, labels []core.BuildLabel, args []string, n
// Returns a relevant exit code (i.e. if at least one subprocess exited unsuccessfully, it will be
// that code, otherwise 0 if all were successful).
func Sequential(state *core.BuildState, labels []core.BuildLabel, args []string, quiet, env bool) int {
pidchan := make(chan int, len(labels))
sigchan := make(chan os.Signal, 1)

// Get notified on SIGINT. Stop listening once this function ends.
signal.Notify(sigchan, syscall.SIGINT)
defer func() {
signal.Stop(sigchan)
close(sigchan)
close(pidchan)
}()

// Start a goroutine to listen for SIGINT, and cancel any running targets if it's received.
go func(){
for range sigchan {
// Got SIGINT - Kill all of the targets, then forward the signal to ourselves.
log.Warning("Got SIGINT; killing running targets")
killRunningTargets(pidchan)
}
}()

for _, label := range labels {
log.Notice("Running %s", label)
if err := run(state, label, args, true, quiet, env); err != nil {
if err := run(state, label, args, true, quiet, env, pidchan, nil); err != nil {
log.Error("%s", err)
return err.code
}
Expand All @@ -70,7 +123,7 @@ func Sequential(state *core.BuildState, labels []core.BuildLabel, args []string,
// If fork is true then we fork to run the target and return any error from the subprocesses.
// If it's false this function never returns (because we either win or die; it's like
// Game of Thrones except rather less glamorous).
func run(state *core.BuildState, label core.BuildLabel, args []string, fork, quiet, setenv bool) *exitError {
func run(state *core.BuildState, label core.BuildLabel, args []string, fork, quiet, setenv bool, pidchan chan<- int, killchan <-chan bool) *exitError {
target := state.Graph.TargetOrDie(label)
if !target.IsBinary {
log.Fatalf("Target %s cannot be run; it's not marked as binary", label)
Expand Down Expand Up @@ -103,15 +156,29 @@ func run(state *core.BuildState, label core.BuildLabel, args []string, fork, qui
// Note that we don't connect stdin. It doesn't make sense for multiple processes.
cmd := core.ExecCommand(splitCmd[0], args[1:]...) // args here don't include argv[0]
cmd.Env = env
if !quiet {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
must(cmd.Start(), args)
err := cmd.Wait()
return toExitError(err, cmd, nil)
}
out, err := cmd.CombinedOutput()
return toExitError(err, cmd, out)

var combinedOutput bytes.Buffer // Dump the command output here, for quiet mode.
if quiet {
cmd.Stdout, cmd.Stderr = &combinedOutput, &combinedOutput
} else {
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
}
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
must(cmd.Start(), args)
if pidchan != nil {
pidchan <- cmd.Process.Pid
}

err := cmd.Wait()
return toExitError(err, cmd, combinedOutput.Bytes())
}

func killRunningTargets(pids <-chan int) {
for pid := range pids {
if groupID, err := syscall.Getpgid(pid); err == nil {
syscall.Kill(-groupID, syscall.SIGKILL)
}
}
}

// environ returns an appropriate environment for a command.
Expand Down
4 changes: 2 additions & 2 deletions src/run/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func TestSequential(t *testing.T) {

func TestParallel(t *testing.T) {
state, labels1, labels2 := makeState()
code := Parallel(state, labels1, nil, 5, false, false)
code := Parallel(state, labels1, nil, 5, false, false, nil)
assert.Equal(t, 0, code)
code = Parallel(state, labels2, nil, 5, true, false)
code = Parallel(state, labels2, nil, 5, true, false, nil)
assert.Equal(t, 1, code)
}

Expand Down
20 changes: 16 additions & 4 deletions src/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ func Watch(state *core.BuildState, labels core.BuildLabels, callback CallbackFun
files := cmap.New()
go startWatching(watcher, state, labels, files)

// We'll send a message on this channel when we're about to start a build, to indicate that any
// targets currently running should be killed.
killchan := make(chan bool, 1)

// The initial setup only builds targets, it doesn't test or run things.
// Do one of those now if requested.
if state.NeedTests || state.NeedRun {
build(state, labels, callback)
build(state, labels, callback, killchan)
}

for {
Expand All @@ -52,6 +56,14 @@ func Watch(state *core.BuildState, labels core.BuildLabels, callback CallbackFun
log.Notice("Skipping notification for %s", event.Name)
continue
}
// Stop any running targets.
if state.NeedTests || state.NeedRun {
log.Warning("Received update notification; killing running targets before " +
"starting new ones")
killchan <- true
close(killchan)
killchan = make(chan bool, 1)
}

// Quick debounce; poll and discard all events for the next brief period.
outer:
Expand All @@ -62,7 +74,7 @@ func Watch(state *core.BuildState, labels core.BuildLabels, callback CallbackFun
break outer
}
}
build(state, labels, callback)
build(state, labels, callback, killchan)
case err := <-watcher.Errors:
log.Error("Error watching files:", err)
}
Expand Down Expand Up @@ -141,7 +153,7 @@ func anyTests(state *core.BuildState, labels []core.BuildLabel) bool {
}

// build invokes a single build while watching.
func build(state *core.BuildState, labels []core.BuildLabel, callback CallbackFunc) {
func build(state *core.BuildState, labels []core.BuildLabel, callback CallbackFunc, killchan <-chan bool) {
// Set up a new state & copy relevant parts off the existing one.
ns := core.NewBuildState(state.Config.Please.NumThreads, state.Cache, state.Verbosity, state.Config)
ns.VerifyHashes = state.VerifyHashes
Expand All @@ -155,6 +167,6 @@ func build(state *core.BuildState, labels []core.BuildLabel, callback CallbackFu
ns.StartTime = time.Now()
callback(ns, labels)
if state.NeedRun {
run.Parallel(state, labels, nil, state.Config.Please.NumThreads, false, false)
go run.Parallel(state, labels, nil, state.Config.Please.NumThreads, false, false, killchan)
}
}

0 comments on commit fd40a1b

Please sign in to comment.