Skip to content

Commit

Permalink
Checks to passing/critical only after reaching a consecutive success/…
Browse files Browse the repository at this point in the history
…failure threshold

A check may be set to become passing/critical only if a specified number of successive
checks return passing/critical in a row. Status will stay identical as before until
the threshold is reached.
This feature is available for HTTP, TCP, gRPC, Docker & Monitor checks.
  • Loading branch information
PHBourquin committed Sep 12, 2019
1 parent 5c1104b commit 24409ee
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 94 deletions.
22 changes: 13 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2356,6 +2356,9 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
if chkType.OutputMaxSize > 0 && maxOutputSize > chkType.OutputMaxSize {
maxOutputSize = chkType.OutputMaxSize
}

statusHandler := checks.NewStatusHandler(a.State, a.logger, chkType.SuccessBeforePassing, chkType.FailuresBeforeCritical)

switch {

case chkType.IsTTL():
Expand Down Expand Up @@ -2395,7 +2398,6 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
tlsClientConfig := a.tlsConfigurator.OutgoingTLSConfigForCheck(chkType.TLSSkipVerify)

http := &checks.CheckHTTP{
Notify: a.State,
CheckID: check.CheckID,
HTTP: chkType.HTTP,
Header: chkType.Header,
Expand All @@ -2405,6 +2407,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
TLSClientConfig: tlsClientConfig,
StatusHandler: statusHandler,
}
http.Start()
a.checkHTTPs[check.CheckID] = http
Expand All @@ -2421,12 +2424,12 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

tcp := &checks.CheckTCP{
Notify: a.State,
CheckID: check.CheckID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
CheckID: check.CheckID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
StatusHandler: statusHandler,
}
tcp.Start()
a.checkTCPs[check.CheckID] = tcp
Expand All @@ -2448,13 +2451,13 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

grpc := &checks.CheckGRPC{
Notify: a.State,
CheckID: check.CheckID,
GRPC: chkType.GRPC,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
TLSClientConfig: tlsClientConfig,
StatusHandler: statusHandler,
}
grpc.Start()
a.checkGRPCs[check.CheckID] = grpc
Expand All @@ -2481,14 +2484,14 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}

dockerCheck := &checks.CheckDocker{
Notify: a.State,
CheckID: check.CheckID,
DockerContainerID: chkType.DockerContainerID,
Shell: chkType.Shell,
ScriptArgs: chkType.ScriptArgs,
Interval: chkType.Interval,
Logger: a.logger,
Client: a.dockerClient,
StatusHandler: statusHandler,
}
if prev := a.checkDockers[check.CheckID]; prev != nil {
prev.Stop()
Expand All @@ -2514,6 +2517,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
Timeout: chkType.Timeout,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
StatusHandler: statusHandler,
}
monitor.Start()
a.checkMonitors[check.CheckID] = monitor
Expand Down
115 changes: 75 additions & 40 deletions agent/checks/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type CheckNotifier interface {
// CheckMonitor is used to periodically invoke a script to
// determine the health of a given check. It is compatible with
// nagios plugins and expects the output in the same format.
// Supports failures_before_critical and success_before_passing.
type CheckMonitor struct {
Notify CheckNotifier
CheckID types.CheckID
Expand All @@ -64,6 +65,7 @@ type CheckMonitor struct {
Timeout time.Duration
Logger *log.Logger
OutputMaxSize int
StatusHandler *StatusHandler

stop bool
stopCh chan struct{}
Expand Down Expand Up @@ -182,8 +184,7 @@ func (c *CheckMonitor) check() {
// Check if the check passed
outputStr := truncateAndLogOutput()
if err == nil {
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, outputStr)
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, outputStr)
return
}

Expand All @@ -193,16 +194,14 @@ func (c *CheckMonitor) check() {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
code := status.ExitStatus()
if code == 1 {
c.Logger.Printf("[WARN] agent: Check %q is now warning", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, outputStr)
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, outputStr)
return
}
}
}

// Set the health as critical
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, outputStr)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, outputStr)
}

// CheckTTL is used to apply a TTL to check status,
Expand Down Expand Up @@ -305,8 +304,8 @@ func (c *CheckTTL) SetStatus(status, output string) string {
// The check is warning if the response code is 429.
// The check is critical if the response code is anything else
// or if the request returns an error
// Supports failures_before_critical and success_before_passing.
type CheckHTTP struct {
Notify CheckNotifier
CheckID types.CheckID
HTTP string
Header map[string][]string
Expand All @@ -316,6 +315,7 @@ type CheckHTTP struct {
Logger *log.Logger
TLSClientConfig *tls.Config
OutputMaxSize int
StatusHandler *StatusHandler

httpClient *http.Client
stop bool
Expand Down Expand Up @@ -392,8 +392,7 @@ func (c *CheckHTTP) check() {

req, err := http.NewRequest(method, c.HTTP, nil)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}

Expand All @@ -417,8 +416,7 @@ func (c *CheckHTTP) check() {

resp, err := c.httpClient.Do(req)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
defer resp.Body.Close()
Expand All @@ -434,34 +432,30 @@ func (c *CheckHTTP) check() {

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// PASSING (2xx)
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, result)

c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, result)
} else if resp.StatusCode == 429 {
// WARNING
// 429 Too Many Requests (RFC 6585)
// The user has sent too many requests in a given amount of time.
c.Logger.Printf("[WARN] agent: Check %q is now warning", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthWarning, result)

c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, result)
} else {
// CRITICAL
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, result)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, result)
}
}

// CheckTCP is used to periodically make an TCP/UDP connection to
// determine the health of a given check.
// The check is passing if the connection succeeds
// The check is critical if the connection returns an error
// Supports failures_before_critical and success_before_passing.
type CheckTCP struct {
Notify CheckNotifier
CheckID types.CheckID
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
CheckID types.CheckID
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
StatusHandler *StatusHandler

dialer *net.Dialer
stop bool
Expand Down Expand Up @@ -522,20 +516,19 @@ func (c *CheckTCP) check() {
conn, err := c.dialer.Dial(`tcp`, c.TCP)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q socket connection failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
conn.Close()
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}

// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
// with nagios plugins and expects the output in the same format.
// Supports failures_before_critical and success_before_passing.
type CheckDocker struct {
Notify CheckNotifier
CheckID types.CheckID
Script string
ScriptArgs []string
Expand All @@ -544,6 +537,7 @@ type CheckDocker struct {
Interval time.Duration
Logger *log.Logger
Client *DockerClient
StatusHandler *StatusHandler

stop chan struct{}
}
Expand Down Expand Up @@ -605,12 +599,7 @@ func (c *CheckDocker) check() {
}
c.Logger.Printf("[TRACE] agent: Check %q output: %s", c.CheckID, out)
}

if status == api.HealthCritical {
c.Logger.Printf("[WARN] agent: Check %q is now critical", c.CheckID)
}

c.Notify.UpdateCheck(c.CheckID, status, out)
c.StatusHandler.updateCheck(c.CheckID, status, out)
}

func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
Expand Down Expand Up @@ -653,14 +642,15 @@ func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
// The check is passing if returned status is SERVING.
// The check is critical if connection fails or returned status is
// not SERVING.
// Supports failures_before_critical and success_before_passing.
type CheckGRPC struct {
Notify CheckNotifier
CheckID types.CheckID
GRPC string
Interval time.Duration
Timeout time.Duration
TLSClientConfig *tls.Config
Logger *log.Logger
StatusHandler *StatusHandler

probe *GrpcHealthProbe
stop bool
Expand Down Expand Up @@ -699,11 +689,9 @@ func (c *CheckGRPC) run() {
func (c *CheckGRPC) check() {
err := c.probe.Check()
if err != nil {
c.Logger.Printf("[DEBUG] agent: Check %q failed: %s", c.CheckID, err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
} else {
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", c.GRPC))
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", c.GRPC))
}
}

Expand All @@ -715,3 +703,50 @@ func (c *CheckGRPC) Stop() {
close(c.stopCh)
}
}

// StatusHandler keep tracks of successive error/success counts and ensures
// that status can be set to critical/passing only once the successive number of event
// reaches the given threshold.
type StatusHandler struct {
inner CheckNotifier
logger *log.Logger
successBeforePassing int
successCounter int
failuresBeforeCritical int
failuresCounter int
}

// NewStatusHandler set counters values to threshold in order to immediatly update status after first check.
func NewStatusHandler(inner CheckNotifier, logger *log.Logger, successBeforePassing, failuresBeforeCritical int) *StatusHandler {
return &StatusHandler{
logger: logger,
inner: inner,
successBeforePassing: successBeforePassing,
successCounter: successBeforePassing,
failuresBeforeCritical: failuresBeforeCritical,
failuresCounter: failuresBeforeCritical,
}
}

func (s *StatusHandler) updateCheck(checkID types.CheckID, status, output string) {

if status == api.HealthPassing || status == api.HealthWarning {
s.successCounter++
if s.successCounter >= s.successBeforePassing {
s.logger.Printf("[DEBUG] agent: Check %q is %q", checkID, status)
s.inner.UpdateCheck(checkID, status, output)
s.failuresCounter = 0
return
}
s.logger.Printf("[WARN] agent: Check %q was %q but has not reached success threshold %d/%d", checkID, status, s.successCounter, s.successBeforePassing)
} else {
s.failuresCounter++
if s.failuresCounter >= s.failuresBeforeCritical {
s.logger.Printf("[WARN] agent: Check %q is now critical", checkID)
s.inner.UpdateCheck(checkID, status, output)
s.successCounter = 0
return
}
s.logger.Printf("[WARN] agent: Check %q failed but has not reached failure threshold %d/%d", checkID, s.failuresCounter, s.failuresBeforeCritical)
}
}
Loading

0 comments on commit 24409ee

Please sign in to comment.