Skip to content

Commit

Permalink
fix(servstate): use WaitDelay to avoid Command.Wait blocking on stdin…
Browse files Browse the repository at this point in the history
…/out/err (#275)

Use os.exec's Cmd.WaitDelay to ensure cmd.Wait returns in a reasonable
timeframe if the goroutines that cmd.Start() uses to copy stdin/out/err
are blocked when copying due to a sub-subprocess holding onto them.
Read more details about the issue in
golang/go#23019 and the proposed solution
(that was added in Go 1.20) in
golang/go#50436.

This solves issue #149, where Patroni
wasn't restarting properly even after a `KILL` signal was sent to it.
I had originally mis-diagnosed this problem as an issue with Pebble
not tracking the process tree of processes that daemonise and change
their process group (which is still an issue, but is not causing this
problem). The Patroni process wasn't being marked as finished at all
due to being blocked on the `cmd.Wait()`. Patroni starts
sub-processes and "forwards" stdin/out/err, so the copy goroutines
block. Thankfully Go 1.20 introduced `WaitDelay` to allow you to
easily work around this exact problem.

The fix itself is [this one-liner]
(#275):

s.cmd.WaitDelay = s.killDelay() * 9 / 10  // 90% of kill-delay

This will really only be a problem for services, but we make the same
change for exec and exec health checks as it won't hurt there
either.

Also, as a drive-by, this PR also canonicalises some log messages: our
style is to start with an uppercase letter (for logs, not errors) and
to use "Cannot X" rather than "Error Xing".

Fixes #149.
  • Loading branch information
benhoyt authored Aug 18, 2023
1 parent 55fdf50 commit 1a66abb
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 50 deletions.
4 changes: 2 additions & 2 deletions internals/daemon/api_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r logsResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
err = encoder.Encode(newJSONLog(<-fifo))
}
if err != nil {
logger.Noticef("error writing logs: %v", err)
logger.Noticef("Cannot write logs: %v", err)
return false
}
flushWriter(w)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (r logsResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Otherwise encode and output log directly.
err := encoder.Encode(newJSONLog(log))
if err != nil {
logger.Noticef("error writing logs: %v", err)
logger.Noticef("Cannot write logs: %v", err)
return
}
if follow {
Expand Down
8 changes: 4 additions & 4 deletions internals/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *Command) canAccess(r *http.Request, user *UserState) accessResult {
if err == nil {
isUser = true
} else if err != errNoID {
logger.Noticef("unexpected error when attempting to get UID: %s", err)
logger.Noticef("Cannot parse UID from remote address %q: %s", r.RemoteAddr, err)
return accessForbidden
}

Expand Down Expand Up @@ -530,7 +530,7 @@ func (d *Daemon) HandleRestart(t restart.RestartType) {
defer d.mu.Unlock()
d.restartSocket = true
default:
logger.Noticef("internal error: restart handler called with unknown restart type: %v", t)
logger.Noticef("Internal error: restart handler called with unknown restart type: %v", t)
}
d.tomb.Kill(nil)
}
Expand Down Expand Up @@ -817,12 +817,12 @@ func (d *Daemon) RebootIsMissing(st *state.State) error {
// might get rolled back!!
restart.ClearReboot(st)
clearReboot(st)
logger.Noticef("pebble was restarted while a system restart was expected, pebble retried to schedule and waited again for a system restart %d times and is giving up", rebootMaxTentatives)
logger.Noticef("Pebble was restarted while a system restart was expected, pebble retried to schedule and waited again for a system restart %d times and is giving up", rebootMaxTentatives)
return nil
}
st.Set("daemon-system-restart-tentative", nTentative)
d.state = st
logger.Noticef("pebble was restarted while a system restart was expected, pebble will try to schedule and wait for a system restart again (tenative %d/%d)", nTentative, rebootMaxTentatives)
logger.Noticef("Pebble was restarted while a system restart was expected, pebble will try to schedule and wait for a system restart again (tenative %d/%d)", nTentative, rebootMaxTentatives)
return errExpectedReboot
}

Expand Down
2 changes: 1 addition & 1 deletion internals/daemon/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (r *resp) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
status := r.Status
bs, err := r.MarshalJSON()
if err != nil {
logger.Noticef("cannot marshal %#v to JSON: %v", *r, err)
logger.Noticef("Cannot marshal %#v to JSON: %v", *r, err)
bs = nil
status = 500
}
Expand Down
3 changes: 3 additions & 0 deletions internals/overlord/checkstate/checkers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"syscall"
"time"

"github.com/canonical/x-go/strutil/shlex"

Expand All @@ -38,6 +39,7 @@ import (
const (
maxErrorBytes = 10 * 1024
maxErrorLines = 20
execWaitDelay = time.Second
)

// httpChecker is a checker that ensures an HTTP GET at a specified URL returns 20x.
Expand Down Expand Up @@ -167,6 +169,7 @@ func (c *execChecker) check(ctx context.Context) error {
defer ringBuffer.Close()
cmd.Stdout = ringBuffer
cmd.Stderr = ringBuffer
cmd.WaitDelay = execWaitDelay
err = reaper.StartCommand(cmd)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions internals/overlord/cmdstate/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
const (
connectTimeout = 5 * time.Second
handshakeTimeout = 5 * time.Second
waitDelay = time.Second

wsControl = "control"
wsStdio = "stdio"
Expand Down Expand Up @@ -343,6 +344,7 @@ func (e *execution) do(ctx context.Context, task *state.Task) error {
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.WaitDelay = waitDelay

cmd.SysProcAttr = &syscall.SysProcAttr{}
if e.userID != nil && e.groupID != nil {
Expand Down
21 changes: 18 additions & 3 deletions internals/overlord/servstate/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,21 @@ func (s *serviceData) startInternal() error {
s.cmd.Stdout = logWriter
s.cmd.Stderr = logWriter

// Add WaitDelay to ensure cmd.Wait() returns in a reasonable timeframe if
// the goroutines that cmd.Start() uses to copy Stdin/Stdout/Stderr are
// blocked when copying due to a sub-subprocess holding onto them. This
// only happens if the sub-subprocess uses setsid or setpgid to change
// the process group. Read more details in these issues:
//
// - https://github.com/golang/go/issues/23019
// - https://github.com/golang/go/issues/50436
//
// This isn't the original intent of kill-delay, but it seems reasonable
// to reuse it in this context. Use a value slightly less than the kill
// delay (90% of it) to avoid racing with trying to send SIGKILL (to a
// process that has already exited).
s.cmd.WaitDelay = s.killDelay() * 9 / 10 // will only overflow if kill-delay is 32 years!

// Start the process!
logger.Noticef("Service %q starting: %s", serviceName, s.config.Command)
err = reaper.StartCommand(s.cmd)
Expand Down Expand Up @@ -616,9 +631,9 @@ func (s *serviceData) sendSignal(signal string) error {
}

// killDelay reports the duration that this service should be given when being
// asked to shutdown gracefully before being force terminated. The value
// returned will either be the services pre configured value or the default
// kill delay for pebble.
// asked to shut down gracefully before being force-terminated. The value
// returned will either be the service's pre-configured value, or the default
// kill delay if that is not set.
func (s *serviceData) killDelay() time.Duration {
if s.config.KillDelay.IsSet {
return s.config.KillDelay.Value
Expand Down
117 changes: 79 additions & 38 deletions internals/overlord/servstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
)

func TestMain(m *testing.M) {
// See TestReaper
// Used by TestReapZombies
if os.Getenv("PEBBLE_TEST_CREATE_ZOMBIE") == "1" {
err := createZombie()
if err != nil {
Expand All @@ -65,6 +65,21 @@ func TestMain(m *testing.M) {
return
}

// Used by TestWaitDelay
if os.Getenv("PEBBLE_TEST_WAITDELAY") == "1" {
// To get WaitDelay to kick in, we need to start a new process with
// setsid (to ensure it has a new process group ID) and passing
// os.Stdout down to the (grand)child process.
cmd := exec.Command("sleep", "10")
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
cmd.Stdout = os.Stdout
err := cmd.Run()
if err != nil {
panic(err)
}
return
}

os.Exit(m.Run())
}

Expand Down Expand Up @@ -378,18 +393,14 @@ func (s *S) TestStartBadCommand(c *C) {
}

func (s *S) TestCurrentUserGroup(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

current, err := user.Current()
c.Assert(err, IsNil)
group, err := user.LookupGroupId(current.Gid)
c.Assert(err, IsNil)

outputPath := filepath.Join(dir, "output")
outputPath := filepath.Join(c.MkDir(), "output")
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
usrtest:
Expand Down Expand Up @@ -540,11 +551,7 @@ services:
}

func (s *S) TestAppendLayer(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

// Append a layer when there are no layers.
layer := parseLayer(c, 0, "label1", `
Expand All @@ -553,7 +560,7 @@ services:
override: replace
command: /bin/sh
`)
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)
c.Assert(layer.Order, Equals, 1)
c.Assert(planYAML(c, s.manager), Equals, `
Expand Down Expand Up @@ -622,11 +629,7 @@ services:
}

func (s *S) TestCombineLayer(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

// "Combine" layer with no layers should just append.
layer := parseLayer(c, 0, "label1", `
Expand All @@ -635,7 +638,7 @@ services:
override: replace
command: /bin/sh
`)
err = s.manager.CombineLayer(layer)
err := s.manager.CombineLayer(layer)
c.Assert(err, IsNil)
c.Assert(layer.Order, Equals, 1)
c.Assert(planYAML(c, s.manager), Equals, `
Expand Down Expand Up @@ -735,11 +738,7 @@ services:
}

func (s *S) TestSetServiceArgs(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

// Append a layer with a few services having default args.
layer := parseLayer(c, 0, "base-layer", `
Expand All @@ -754,7 +753,7 @@ services:
override: replace
command: foo
`)
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)
c.Assert(layer.Order, Equals, 1)
s.planLayersHasLen(c, s.manager, 1)
Expand Down Expand Up @@ -1601,20 +1600,16 @@ func (s *S) TestStopRunningNoServices(c *C) {
}

func (s *S) TestNoWorkingDir(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

outputPath := filepath.Join(dir, "output")
outputPath := filepath.Join(c.MkDir(), "output")
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
nowrkdir:
override: replace
command: /bin/sh -c "pwd >%s; %s; sleep %g"
`, outputPath, s.insertDoneCheck(c, "nowrkdir"), shortOkayDelay.Seconds()+0.01))
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)

// Service command should run in current directory (package directory)
Expand All @@ -1632,12 +1627,9 @@ services:
}

func (s *S) TestWorkingDir(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
s.setupEmptyServiceManager(c)

dir := c.MkDir()
outputPath := filepath.Join(dir, "output")
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
Expand All @@ -1646,7 +1638,7 @@ services:
command: /bin/sh -c "pwd >%s; %s; sleep %g"
working-dir: %s
`, outputPath, s.insertDoneCheck(c, "wrkdir"), shortOkayDelay.Seconds()+0.01, dir))
err = s.manager.AppendLayer(layer)
err := s.manager.AppendLayer(layer)
c.Assert(err, IsNil)

chg := s.startServices(c, []string{"wrkdir"}, 1)
Expand All @@ -1661,6 +1653,44 @@ services:
c.Check(string(output), Equals, dir+"\n")
}

func (s *S) TestWaitDelay(c *C) {
s.setupEmptyServiceManager(c)

// Run the test binary with PEBBLE_TEST_WAITDELAY=1 (see TestMain).
testExecutable, err := os.Executable()
c.Assert(err, IsNil)
layer := parseLayer(c, 0, "layer", fmt.Sprintf(`
services:
waitdelay:
override: replace
command: %s
environment:
PEBBLE_TEST_WAITDELAY: 1
kill-delay: 50ms
`, testExecutable))
err = s.manager.AppendLayer(layer)
c.Assert(err, IsNil)

// Start service and wait for it to be started
chg := s.startServices(c, []string{"waitdelay"}, 1)
s.st.Lock()
c.Assert(chg.Err(), IsNil)
s.st.Unlock()
s.waitUntilService(c, "waitdelay", func(svc *servstate.ServiceInfo) bool {
return svc.Current == servstate.StatusActive
})

// Try to stop the service; it will only stop if WaitDelay logic is working,
// otherwise the goroutine waiting for the child's stdout will never finish.
chg = s.stopServices(c, []string{"waitdelay"}, 1)
s.st.Lock()
c.Assert(chg.Err(), IsNil)
s.st.Unlock()
s.waitUntilService(c, "waitdelay", func(svc *servstate.ServiceInfo) bool {
return svc.Current == servstate.StatusInactive
})
}

// setupDefaultServiceManager provides a basic setup that can be used by many
// of the unit tests without having to create a custom setup.
func (s *S) setupDefaultServiceManager(c *C) {
Expand All @@ -1677,6 +1707,16 @@ func (s *S) setupDefaultServiceManager(c *C) {
c.Assert(err, IsNil)
}

// setupEmptyServiceManager sets up a service manager with no layers for tests
// that want to customize them.
func (s *S) setupEmptyServiceManager(c *C) {
dir := c.MkDir()
err := os.Mkdir(filepath.Join(dir, "layers"), 0755)
c.Assert(err, IsNil)
s.manager, err = servstate.NewManager(s.st, s.runner, dir, nil, nil, fakeLogManager{})
c.Assert(err, IsNil)
}

// Make sure services are all stopped before the next test starts.
func (s *S) stopRunningServices(c *C) {
taskSet, err := servstate.StopRunning(s.st, s.manager)
Expand Down Expand Up @@ -1898,6 +1938,7 @@ func getChildSubreaper() (bool, error) {
}

func createZombie() error {
// Run the test binary with PEBBLE_TEST_ZOMBIE_CHILD=1 (see TestMain)
testExecutable, err := os.Executable()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internals/overlord/stateengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (se *StateEngine) Ensure() error {
for _, m := range se.managers {
err := m.Ensure()
if err != nil {
logger.Noticef("state ensure error: %v", err)
logger.Noticef("State ensure error: %v", err)
errs = append(errs, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internals/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func StartCommand(cmd *exec.Cmd) error {
case ch <- -1:
default:
}
logger.Noticef("internal error: new PID %d observed while still being tracked", cmd.Process.Pid)
logger.Noticef("Internal error: new PID %d observed while still being tracked", cmd.Process.Pid)
}
// Channel is 1-buffered so the send in reapOnce never blocks, if for
// some reason someone forgets to call WaitCommand.
Expand Down

0 comments on commit 1a66abb

Please sign in to comment.