-
Notifications
You must be signed in to change notification settings - Fork 782
/
child.go
481 lines (408 loc) · 12.3 KB
/
child.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
package child
import (
"errors"
"fmt"
"io"
"log"
"math/rand"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
)
func init() {
// Seed the default rand Source with current time to produce better random
// numbers used with splay
rand.Seed(time.Now().UnixNano())
}
var (
// ErrMissingCommand is the error returned when no command is specified
// to run.
ErrMissingCommand = errors.New("missing command")
// ExitCodeOK is the default OK exit code.
ExitCodeOK = 0
// ExitCodeError is the default error code returned when the child exits with
// an error without a more specific code.
ExitCodeError = 127
)
// Child is a wrapper around a child process which can be used to send signals
// and manage the processes' lifecycle.
type Child struct {
sync.RWMutex
stdin io.Reader
stdout, stderr io.Writer
command string
args []string
env []string
timeout time.Duration
reloadSignal os.Signal
killSignal os.Signal
killTimeout time.Duration
splay time.Duration
// cmd is the actual child process under management.
cmd *exec.Cmd
// exitCh is the channel where the processes exit will be returned.
exitCh chan int
// stopLock is the mutex to lock when stopping. stopCh is the circuit breaker
// to force-terminate any waiting splays to kill the process now. stopped is
// a boolean that tells us if we have previously been stopped.
stopLock sync.RWMutex
stopCh chan struct{}
stopped bool
// whether to set process group id or not (default on)
setpgid bool
// whether to set process session id or not (default off)
setsid bool
// a logger that can be used for messages pertinent to this child process
logger *log.Logger
}
// NewInput is input to the NewChild function.
type NewInput struct {
// Stdin is the io.Reader where input will come from. This is sent directly to
// the child process. Stdout and Stderr represent the io.Writer objects where
// the child process will send output and errorput.
Stdin io.Reader
Stdout, Stderr io.Writer
// Command is the name of the command to execute. Args are the list of
// arguments to pass when starting the command.
Command string
Args []string
// Timeout is the maximum amount of time to allow the command to execute. If
// set to 0, the command is permitted to run infinitely.
Timeout time.Duration
// Env represents the condition of the child processes' environment
// variables. Only these environment variables will be given to the child, so
// it is the responsibility of the caller to include the parent processes
// environment, if required. This should be in the key=value format.
Env []string
// ReloadSignal is the signal to send to reload this process. This value may
// be nil.
ReloadSignal os.Signal
// KillSignal is the signal to send to gracefully kill this process. This
// value may be nil.
KillSignal os.Signal
// KillTimeout is the amount of time to wait for the process to gracefully
// terminate before force-killing.
KillTimeout time.Duration
// Splay is the maximum random amount of time to wait before sending signals.
// This option helps reduce the thundering herd problem by effectively
// sleeping for a random amount of time before sending the signal. This
// prevents multiple processes from all signaling at the same time. This value
// may be zero (which disables the splay entirely).
Splay time.Duration
// Setsid flag, if set to true will create the child processes with their own
// session ID and Process group ID. Note this overrides Setpgid below.
Setsid bool
// Setpgid flag, if set to true will create the child processes with their
// own Process group ID. If set the child processes to have their own PGID
// but they will have the same SID as that of their parent
Setpgid bool
// an optional logger that can be used for messages pertinent to the child process
Logger *log.Logger
}
// New creates a new child process for management with high-level APIs for
// sending signals to the child process, restarting the child process, and
// gracefully terminating the child process.
func New(i *NewInput) (*Child, error) {
if i == nil {
i = new(NewInput)
}
if len(i.Command) == 0 {
return nil, ErrMissingCommand
}
if i.Logger == nil {
i.Logger = log.Default()
}
child := &Child{
stdin: i.Stdin,
stdout: i.Stdout,
stderr: i.Stderr,
command: i.Command,
args: i.Args,
env: i.Env,
timeout: i.Timeout,
reloadSignal: i.ReloadSignal,
killSignal: i.KillSignal,
killTimeout: i.KillTimeout,
splay: i.Splay,
stopCh: make(chan struct{}, 1),
setpgid: i.Setpgid && !i.Setsid,
setsid: i.Setsid,
logger: i.Logger,
}
return child, nil
}
// ExitCh returns the current exit channel for this child process. This channel
// may change if the process is restarted, so implementers must not cache this
// value.
func (c *Child) ExitCh() <-chan int {
c.RLock()
defer c.RUnlock()
return c.exitCh
}
// Pid returns the pid of the child process. If no child process exists, 0 is
// returned.
func (c *Child) Pid() int {
c.RLock()
defer c.RUnlock()
return c.pid()
}
// Command returns the human-formatted command with arguments.
func (c *Child) Command() string {
list := append([]string{c.command}, c.args...)
return strings.Join(list, " ")
}
// Start starts and begins execution of the child process. A buffered channel
// is returned which is where the command's exit code will be returned upon
// exit. Any errors that occur prior to starting the command will be returned
// as the second error argument, but any errors returned by the command after
// execution will be returned as a non-zero value over the exit code channel.
func (c *Child) Start() error {
c.logger.Printf("[INFO] (child) spawning: %s", c.Command())
c.Lock()
defer c.Unlock()
return c.start()
}
// Signal sends the signal to the child process, returning any errors that
// occur.
func (c *Child) Signal(s os.Signal) error {
c.logger.Printf("[INFO] (child) receiving signal %q", s.String())
c.RLock()
defer c.RUnlock()
return c.signal(s)
}
// Reload sends the reload signal to the child process and does not wait for a
// response. If no reload signal was provided, the process is restarted and
// replaces the process attached to this Child.
func (c *Child) Reload() error {
if c.reloadSignal == nil {
c.logger.Printf("[INFO] (child) restarting process")
// Take a full lock because start is going to replace the process. We also
// want to make sure that no other routines attempt to send reload signals
// during this transition.
c.Lock()
defer c.Unlock()
c.kill(false)
return c.start()
}
c.logger.Printf("[INFO] (child) reloading process")
// We only need a read lock here because neither the process nor the exit
// channel are changing.
c.RLock()
defer c.RUnlock()
return c.reload()
}
// Kill sends the kill signal to the child process and waits for successful
// termination. If no kill signal is defined, the process is killed with the
// most aggressive kill signal. If the process does not gracefully stop within
// the provided KillTimeout, the process is force-killed. If a splay was
// provided, this function will sleep for a random period of time between 0 and
// the provided splay value to reduce the thundering herd problem. This function
// does not return any errors because it guarantees the process will be dead by
// the return of the function call.
func (c *Child) Kill() {
c.logger.Printf("[INFO] (child) killing process")
c.Lock()
defer c.Unlock()
c.kill(false)
}
// Stop behaves almost identical to Kill except it suppresses future processes
// from being started by this child and it prevents the killing of the child
// process from sending its value back up the exit channel. This is useful
// when doing a graceful shutdown of an application.
func (c *Child) Stop() {
c.internalStop(false)
}
// StopImmediately behaves almost identical to Stop except it does not wait
// for any random splay if configured. This is used for performing a fast
// shutdown of consul-template and its children when a kill signal is received.
func (c *Child) StopImmediately() {
c.internalStop(true)
}
func (c *Child) internalStop(immediately bool) {
c.logger.Printf("[INFO] (child) stopping process")
c.Lock()
defer c.Unlock()
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.stopped {
c.logger.Printf("[WARN] (child) already stopped")
return
}
c.kill(immediately)
close(c.stopCh)
c.stopped = true
}
func (c *Child) start() error {
cmd := exec.Command(c.command, c.args...)
cmd.Stdin = c.stdin
cmd.Stdout = c.stdout
cmd.Stderr = c.stderr
cmd.Env = c.env
setSysProcAttr(cmd, c.setpgid, c.setsid)
if err := cmd.Start(); err != nil {
return err
}
c.cmd = cmd
// Create a new exitCh so that previously invoked commands (if any) don't
// cause us to exit, and start a goroutine to wait for that process to end.
exitCh := make(chan int, 1)
go func() {
var code int
err := cmd.Wait()
if err == nil {
code = ExitCodeOK
} else {
code = ExitCodeError
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
code = status.ExitStatus()
}
}
}
// If the child is in the process of killing, do not send a response back
// down the exit channel.
c.stopLock.RLock()
defer c.stopLock.RUnlock()
if !c.stopped {
select {
case <-c.stopCh:
case exitCh <- code:
}
}
close(exitCh)
}()
c.exitCh = exitCh
// If a timeout was given, start the timer to wait for the child to exit
if c.timeout != 0 {
select {
case code := <-exitCh:
if code != 0 {
return fmt.Errorf(
"command exited with a non-zero exit status:\n"+
"\n"+
" %s\n"+
"\n"+
"This is assumed to be a failure. Please ensure the command\n"+
"exits with a zero exit status.",
c.Command(),
)
}
case <-time.After(c.timeout):
// Force-kill the process
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.cmd != nil && c.cmd.Process != nil {
c.cmd.Process.Kill()
}
return fmt.Errorf(
"command did not exit within %q:\n"+
"\n"+
" %s\n"+
"\n"+
"Commands must exit in a timely manner in order for processing to\n"+
"continue. Consider using a process supervisor or utilizing the\n"+
"built-in exec mode instead.",
c.timeout,
c.Command(),
)
}
}
return nil
}
func (c *Child) pid() int {
if !c.running() {
return 0
}
return c.cmd.Process.Pid
}
func (c *Child) signal(s os.Signal) error {
if !c.running() {
return nil
}
sig, ok := s.(syscall.Signal)
if !ok {
return fmt.Errorf("bad signal: %s", s)
}
pid := c.cmd.Process.Pid
if c.setpgid {
// kill takes negative pid to indicate that you want to use gpid
pid = -(pid)
}
// cross platform way to signal process/process group
if p, err := os.FindProcess(pid); err != nil {
return err
} else {
return p.Signal(sig)
}
}
func (c *Child) reload() error {
select {
case <-c.stopCh:
case <-c.randomSplay():
}
return c.signal(c.reloadSignal)
}
// kill sends the signal to kill the process using the configured signal
// if set, else the default system signal
func (c *Child) kill(immediately bool) {
if !c.running() {
c.logger.Printf("[DEBUG] (child) Kill() called but process dead; not waiting for splay.")
return
} else if immediately {
c.logger.Printf("[DEBUG] (child) Kill() called but performing immediate shutdown; not waiting for splay.")
} else {
select {
case <-c.stopCh:
case <-c.randomSplay():
}
}
var exited bool
defer func() {
if !exited {
c.cmd.Process.Kill()
}
c.cmd = nil
}()
if c.killSignal == nil {
return
}
if err := c.signal(c.killSignal); err != nil {
c.logger.Printf("[ERR] (child) Kill failed: %s", err)
if processNotFoundErr(err) {
exited = true // checked in defer
}
return
}
killCh := make(chan struct{}, 1)
go func() {
defer close(killCh)
c.cmd.Process.Wait()
}()
select {
case <-c.stopCh:
case <-killCh:
exited = true
case <-time.After(c.killTimeout):
}
}
func (c *Child) running() bool {
select {
case <-c.exitCh:
return false
default:
}
return c.cmd != nil && c.cmd.Process != nil
}
func (c *Child) randomSplay() <-chan time.Time {
if c.splay == 0 {
return time.After(0)
}
ns := c.splay.Nanoseconds()
offset := rand.Int63n(ns)
t := time.Duration(offset)
c.logger.Printf("[DEBUG] (child) waiting %.2fs for random splay", t.Seconds())
return time.After(t)
}