Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Handle external probe process termination more gracefully.
Browse files Browse the repository at this point in the history
= Use probe start context to start the external probe processes. This will make sure that external probe processes are killed cleanly once start context is canceled.
= On SIGINT and SIGTERM, cancel the start context and wait for a grace shutdown time period (default=5s).

This is in response to #413. I had been planning for this anyway.  We should also wire init context properly and make sure we can cleanup properly.

PiperOrigin-RevId: 318986425
  • Loading branch information
manugarg committed Jun 30, 2020
1 parent c003b5c commit 5c1b573
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
17 changes: 16 additions & 1 deletion cmd/cloudprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"os"
"os/signal"
"runtime/pprof"
"syscall"
"time"

"cloud.google.com/go/compute/metadata"
"flag"
Expand All @@ -42,6 +44,7 @@ import (
var (
configFile = flag.String("config_file", "", "Config file")
versionFlag = flag.Bool("version", false, "Print version and exit")
stopTime = flag.Duration("stop_time", 5*time.Second, "How long to wait for cleanup before process exits on SIGINT and SIGTERM")
cpuprofile = flag.String("cpuprof", "", "Write cpu profile to file")
memprofile = flag.String("memprof", "", "Write heap profile to file")
configTest = flag.Bool("configtest", false, "Dry run to test config file")
Expand Down Expand Up @@ -181,7 +184,19 @@ func main() {
// web.Init sets up web UI for cloudprober.
web.Init()

cloudprober.Start(context.Background())
// Set up signal handling for the cancelation of the start context.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
startCtx, cancelF := context.WithCancel(context.Background())

go func() {
sig := <-sigs
glog.Warningf("Received signal \"%v\", canceling the start context and waiting for %v before closing", sig, *stopTime)
cancelF()
time.Sleep(*stopTime)
os.Exit(0)
}()
cloudprober.Start(startCtx)

// Wait forever
select {}
Expand Down
29 changes: 18 additions & 11 deletions probes/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func substituteLabels(in string, labels map[string]string) (string, bool) {
return output, foundAll
}

func (p *Probe) startCmdIfNotRunning() error {
func (p *Probe) startCmdIfNotRunning(startCtx context.Context) error {
// Start external probe command if it's not running already. Note that here we
// are trusting the cmdRunning to be set correctly. It can be false for 3 reasons:
// 1) This is the first call and the process has actually never been started.
Expand All @@ -223,7 +223,7 @@ func (p *Probe) startCmdIfNotRunning() error {
return nil
}
p.l.Infof("Starting external command: %s %s", p.cmdName, strings.Join(p.cmdArgs, " "))
cmd := exec.Command(p.cmdName, p.cmdArgs...)
cmd := exec.CommandContext(startCtx, p.cmdName, p.cmdArgs...)
var err error
if p.cmdStdin, err = cmd.StdinPipe(); err != nil {
return err
Expand Down Expand Up @@ -254,6 +254,13 @@ func (p *Probe) startCmdIfNotRunning() error {
err := cmd.Wait()
close(doneChan)
p.cmdRunning = false

// Spare logging error message if killed explicitly.
select {
case <-startCtx.Done():
return
}

if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
p.l.Errorf("external probe process died with the status: %s. Stderr: %s", exitErr.Error(), string(exitErr.Stderr))
Expand Down Expand Up @@ -399,12 +406,12 @@ func (p *Probe) processProbeResult(ps *probeStatus, result *result) {
}
}

func (p *Probe) runServerProbe(ctx context.Context) {
func (p *Probe) runServerProbe(ctx, startCtx context.Context) {
requests := make(map[int32]requestInfo)
var requestsMu sync.RWMutex
doneChan := make(chan struct{})

if err := p.startCmdIfNotRunning(); err != nil {
if err := p.startCmdIfNotRunning(startCtx); err != nil {
p.l.Error(err.Error())
return
}
Expand Down Expand Up @@ -552,21 +559,21 @@ func (p *Probe) updateTargets() {
}
}

func (p *Probe) runProbe(ctx context.Context) {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, p.opts.Timeout)
func (p *Probe) runProbe(startCtx context.Context) {
probeCtx, cancelFunc := context.WithTimeout(startCtx, p.opts.Timeout)
defer cancelFunc()

p.updateTargets()

if p.mode == "server" {
p.runServerProbe(ctxTimeout)
p.runServerProbe(probeCtx, startCtx)
} else {
p.runOnceProbe(ctxTimeout)
p.runOnceProbe(probeCtx)
}
}

// Start starts and runs the probe indefinitely.
func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics) {
func (p *Probe) Start(startCtx context.Context, dataChan chan *metrics.EventMetrics) {
p.dataChan = dataChan

ticker := time.NewTicker(p.opts.Interval)
Expand All @@ -575,11 +582,11 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
for range ticker.C {
// Don't run another probe if context is canceled already.
select {
case <-ctx.Done():
case <-startCtx.Done():
return
default:
}

p.runProbe(ctx)
p.runProbe(startCtx)
}
}

0 comments on commit 5c1b573

Please sign in to comment.