Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reap zombie process and opt-out with no-reap flag #172

Merged
merged 11 commits into from
Oct 10, 2024
4 changes: 2 additions & 2 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cron
import (
"context"
"fmt"
"io/ioutil"
"io"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -41,7 +41,7 @@ func (hook *testHook) Levels() []logrus.Level {

func newTestLogger() (*logrus.Entry, chan *logrus.Entry) {
logger := logrus.New()
logger.Out = ioutil.Discard
logger.Out = io.Discard
logger.Level = logrus.DebugLevel

channel := make(chan *logrus.Entry, TEST_CHANNEL_BUFFER_SIZE)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/evalphobia/logrus_sentry v0.8.2
github.com/fsnotify/fsnotify v1.7.0
github.com/prometheus/client_golang v1.20.2
github.com/ramr/go-reaper v0.2.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ github.com/prometheus/common v0.57.0 h1:Ro/rKjwdq9mZn1K5QPctzh+MA4Lp0BuYk5ZZEVho
github.com/prometheus/common v0.57.0/go.mod h1:7uRPFSUTbfZWsJ7MHY56sqt7hLQu3bxXHDnNhl8E9qI=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/ramr/go-reaper v0.2.1 h1:zww+wlQOvTjBZuk1920R/e0GFEb6O7+B0WQLV6dM924=
github.com/ramr/go-reaper v0.2.1/go.mod h1:AVypdzrcCXjSc/JYnlXl8TsB+z84WyFzxWE8Jh0MOJc=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
1 change: 1 addition & 0 deletions integration/normal.crontab
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* * * * * * * echo 1
32 changes: 26 additions & 6 deletions integration/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,34 @@ wait_for() {
! run_supercronic -test "${BATS_TEST_DIRNAME}/invalid.crontab"
}

@test "reap zombie process" {
@test "it run as pid 1 and reap zombie process" {
out="${WORK_DIR}/zombie-crontab-out"

# run in new process namespace
sudo timeout 10s unshare --fork --pid --mount-proc \
${BATS_TEST_DIRNAME}/../supercronic "${BATS_TEST_DIRNAME}/zombie.crontab" &
${BATS_TEST_DIRNAME}/../supercronic "${BATS_TEST_DIRNAME}/zombie.crontab" >"$out" 2>&1 &
local pid=$!
sleep 1.5
run bash -c "ps axo pid=,stat=|grep Z"
sleep 3

kill -TERM ${pid}

[[ "$status" -eq 1 ]]
# todo: use other method to detect zombie cleanup
wait_for grep "reaper cleanup: pid=" "$out"
}


@test "it run as pid 1 and normal crontab no error" {
out="${WORK_DIR}/normal-crontab-out"

# sleep 30 seconds occur found bug
# FIXME: other way to detect
sudo timeout 30s unshare --fork --pid --mount-proc \
"${BATS_TEST_DIRNAME}/../supercronic" "${BATS_TEST_DIRNAME}/normal.crontab" >"$out" 2>&1 &
# https://github.com/aptible/supercronic/issues/171
local pid=$!
local foundErr

sleep 29.5
kill -TERM ${pid}
grep "waitid: no child processes" "$out" && foundErr=1
[[ $foundErr != 1 ]]
}
2 changes: 1 addition & 1 deletion integration/zombie.crontab
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* * * * * * * /bin/sleep 1 & exec /bin/sleep 0
* * * * * * * /bin/sleep 0.1 & exec /bin/sleep 0
6 changes: 3 additions & 3 deletions log/hook/splitstream.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package hook

import (
"github.com/sirupsen/logrus"
"io"
"io/ioutil"

"github.com/sirupsen/logrus"
)

type writerHook struct {
Expand All @@ -25,7 +25,7 @@ func (h *writerHook) Fire(entry *logrus.Entry) error {
}

func RegisterSplitLogger(logger *logrus.Logger, outWriter io.Writer, errWriter io.Writer) {
logger.SetOutput(ioutil.Discard)
logger.SetOutput(io.Discard)

logger.AddHook(&writerHook{
writer: outWriter,
Expand Down
27 changes: 20 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/aptible/supercronic/prometheus_metrics"
"github.com/evalphobia/logrus_sentry"
"github.com/fsnotify/fsnotify"
reaper "github.com/ramr/go-reaper"
"github.com/sirupsen/logrus"
)

Expand All @@ -31,6 +30,8 @@ func main() {
json := flag.Bool("json", false, "enable JSON logging")
test := flag.Bool("test", false, "test crontab (does not run jobs)")
inotify := flag.Bool("inotify", false, "use inotify to detect crontab file changes")
// If this flag changes, update forkExec to disable reaping in the child process
disableReap := flag.Bool("no-reap", false, "disable reaping of dead processes, note: reaping requires pid 1")
prometheusListen := flag.String(
"prometheus-listen-address",
"",
Expand Down Expand Up @@ -101,7 +102,19 @@ func main() {
os.Exit(2)
return
}

if !*disableReap {
if os.Getpid() == 1 {
// Clean up zombie processes caused by incorrect crontab commands
// Use forkExec to avoid random waitid errors
// https://github.com/aptible/supercronic/issues/88
// https://github.com/aptible/supercronic/issues/171
logrus.Info("reaping dead processes")
forkExec()
return
}

logrus.Warn("process reaping disabled, not pid 1")
}
crontabFileName := flag.Args()[0]

var watcher *fsnotify.Watcher
Expand Down Expand Up @@ -165,12 +178,8 @@ func main() {
}()
}

// Start background reaping of orphaned child processes.
go reaper.Reap()
// _ = reaper.Reap

termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2)
signal.Notify(termChan, signalList...)

if *inotify {
go func() {
Expand Down Expand Up @@ -266,3 +275,7 @@ func readCrontabAtPath(path string) (*crontab.Crontab, error) {

return crontab.ParseCrontab(file)
}

var signalList = []os.Signal{
syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR2,
}
123 changes: 123 additions & 0 deletions reaper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"os"
"os/signal"
"syscall"

"github.com/sirupsen/logrus"
)

func forkExec() {

// run supercronic in other process
pwd, err := os.Getwd()
if err != nil {
logrus.Fatalf("Failed to get current working directory: %s", err.Error())
return
}

pattrs := &syscall.ProcAttr{
Dir: pwd,
Env: os.Environ(),
Files: []uintptr{
uintptr(syscall.Stdin),
uintptr(syscall.Stdout),
uintptr(syscall.Stderr),
},
}
args := make([]string, 0, len(os.Args)+1)
// disable reaping for supercronic, avoid no sense warning
args = append(args, os.Args[0], "-no-reap")
args = append(args, os.Args[1:]...)

pid, err := syscall.ForkExec(args[0], args, pattrs)
if err != nil {
logrus.Fatalf("Failed to fork exec: %s", err.Error())
return
}

// forward signal to supercronic
signalToFork(pid)
// got supercronic exit status
wstatus := reapChildren(pid)
os.Exit(wstatus.ExitStatus())
}

func signalToFork(pid int) {
p, err := os.FindProcess(pid)
if err != nil {
logrus.Fatalf("Failed findProcess supercronic pid:%d,%s", pid, err.Error())
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, signalList...)
go func() {
for {
s := <-termChan
if err := p.Signal(s); err != nil {
logrus.Errorf("Failed to send signal to supercronic: %s", err.Error())
}
}
}()
}

// copy from https://github.com/ramr/go-reaper
// modify for wait exit status of supercronic
// without modify, supercronic exit status may not be obtained

// Be a good parent - clean up behind the children.
func reapChildren(superCrondPid int) syscall.WaitStatus {
var notifications = make(chan os.Signal, 1)

go sigChildHandler(notifications)

// all child
const rpid = -1
var wstatus syscall.WaitStatus

for {
var sig = <-notifications
logrus.Debugf("reaper received signal %v\n", sig)
for {
pid, err := syscall.Wait4(rpid, &wstatus, 0, nil)
for syscall.EINTR == err {
pid, err = syscall.Wait4(pid, &wstatus, 0, nil)
}

if syscall.ECHILD == err {
break
}

if superCrondPid == pid {
logrus.Debugf("supercronic exit, pid=%d, wstatus=%+v, err=%+v\n", pid, wstatus, err)
return wstatus
}
// note: change output need change test
logrus.Warnf("reaper cleanup: pid=%d, wstatus=%+v\n",
pid, wstatus)
}
}

}

// Handle death of child (SIGCHLD) messages. Pushes the signal onto the
// notifications channel if there is a waiter.
func sigChildHandler(notifications chan os.Signal) {
var sigs = make(chan os.Signal, 3)
signal.Notify(sigs, syscall.SIGCHLD)

for {
var sig = <-sigs
select {
case notifications <- sig: /* published it. */
default:
/*
* Notifications channel full - drop it to the
* floor. This ensures we don't fill up the SIGCHLD
* queue. The reaper just waits for any child
* process (pid=-1), so we ain't loosing it!! ;^)
*/
}
}

} /* End of function sigChildHandler. */
Loading