diff --git a/.travis.yml b/.travis.yml index 992d2b2..c59ad9c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,13 +4,11 @@ script: - cd hystrix - go test -race go: - - 1.3.3 - - 1.4.3 - - 1.5.4 - - 1.6.0 - - 1.6.1 - - 1.6.2 + - 1.6.x + - 1.7.x + - 1.8.x + - 1.9.x - tip env: global: - - GORACE="halt_on_error=1" \ No newline at end of file + - GORACE="halt_on_error=1" diff --git a/hystrix/circuit.go b/hystrix/circuit.go index b92d98f..0d961b0 100644 --- a/hystrix/circuit.go +++ b/hystrix/circuit.go @@ -170,7 +170,10 @@ func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, return fmt.Errorf("no event types sent for metrics") } - if eventTypes[0] == "success" && circuit.open { + circuit.mutex.RLock() + o := circuit.open + circuit.mutex.RUnlock() + if eventTypes[0] == "success" && o { circuit.setClose() } diff --git a/hystrix/circuit_test.go b/hystrix/circuit_test.go index b51180d..cf6747b 100644 --- a/hystrix/circuit_test.go +++ b/hystrix/circuit_test.go @@ -7,6 +7,8 @@ import ( "time" . "github.com/smartystreets/goconvey/convey" + "testing/quick" + "math/rand" ) func TestGetCircuit(t *testing.T) { @@ -94,3 +96,54 @@ func TestReportEventOpenThenClose(t *testing.T) { }) }) } + +func TestReportEventMultiThreaded(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + run := func() bool { + defer Flush() + // Make the circuit easily open and close intermittently. + ConfigureCommand("", CommandConfig{ + MaxConcurrentRequests: 1, + ErrorPercentThreshold: 1, + RequestVolumeThreshold: 1, + SleepWindow: 10, + }) + cb, _, _ := GetCircuit("") + count := 5 + wg := &sync.WaitGroup{} + wg.Add(count) + c := make(chan bool, count) + for i := 0; i < count; i++ { + go func() { + defer func() { + if r := recover(); r != nil { + t.Error(r) + c <- false + } else { + wg.Done() + } + }() + // randomized eventType to open/close circuit + eventType := "rejected" + if rand.Intn(3) == 1 { + eventType = "success" + } + err := cb.ReportEvent([]string{eventType}, time.Now(), time.Second) + if err != nil { + t.Error(err) + } + time.Sleep(time.Millisecond) + // cb.IsOpen() internally calls cb.setOpen() + cb.IsOpen() + }() + } + go func() { + wg.Wait() + c <- true + }() + return <-c + } + if err := quick.Check(run, nil); err != nil { + t.Error(err) + } +} diff --git a/hystrix/hystrix.go b/hystrix/hystrix.go index b7127d9..be72da4 100644 --- a/hystrix/hystrix.go +++ b/hystrix/hystrix.go @@ -29,13 +29,11 @@ type command struct { start time.Time errChan chan error finished chan bool - fallbackOnce *sync.Once circuit *CircuitBreaker run runFunc fallback fallbackFunc runDuration time.Duration events []string - timedOut bool } var ( @@ -59,7 +57,6 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error { start: time.Now(), errChan: make(chan error, 1), finished: make(chan bool, 1), - fallbackOnce: &sync.Once{}, } // dont have methods with explicit params and returns @@ -72,6 +69,29 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error { return cmd.errChan } cmd.circuit = circuit + ticketCond := sync.NewCond(cmd) + ticketChecked := false + // When the caller extracts error from returned errChan, it's assumed that + // the ticket's been returned to executorPool. Therefore, returnTicket() can + // not run after cmd.errorWithFallback(). + returnTicket := func() { + cmd.Lock() + // Avoid releasing before a ticket is acquired. + for !ticketChecked { + ticketCond.Wait() + } + cmd.circuit.executorPool.Return(cmd.ticket) + cmd.Unlock() + } + // Shared by the following two goroutines. It ensures only the faster + // goroutine runs errWithFallback() and reportAllEvent(). + returnOnce := &sync.Once{} + reportAllEvent := func() { + err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) + if err != nil { + log.Print(err) + } + } go func() { defer func() { cmd.finished <- true }() @@ -80,7 +100,16 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error { // Rejecting new executions allows backends to recover, and the circuit will allow // new traffic when it feels a healthly state has returned. if !cmd.circuit.AllowRequest() { - cmd.errorWithFallback(ErrCircuitOpen) + cmd.Lock() + // It's safe for another goroutine to go ahead releasing a nil ticket. + ticketChecked = true + ticketCond.Signal() + cmd.Unlock() + returnOnce.Do(func() { + returnTicket() + cmd.errorWithFallback(ErrCircuitOpen) + reportAllEvent() + }) return } @@ -92,51 +121,48 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error { cmd.Lock() select { case cmd.ticket = <-circuit.executorPool.Tickets: + ticketChecked = true + ticketCond.Signal() cmd.Unlock() default: + ticketChecked = true + ticketCond.Signal() cmd.Unlock() - cmd.errorWithFallback(ErrMaxConcurrency) + returnOnce.Do(func() { + returnTicket() + cmd.errorWithFallback(ErrMaxConcurrency) + reportAllEvent() + }) return } runStart := time.Now() runErr := run() - - if !cmd.isTimedOut() { + returnOnce.Do(func() { + defer reportAllEvent() cmd.runDuration = time.Since(runStart) - + returnTicket() if runErr != nil { cmd.errorWithFallback(runErr) return } - cmd.reportEvent("success") - } + }) }() go func() { - defer func() { - cmd.Lock() - cmd.circuit.executorPool.Return(cmd.ticket) - cmd.Unlock() - - err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) - if err != nil { - log.Print(err) - } - }() - timer := time.NewTimer(getSettings(name).Timeout) defer timer.Stop() select { case <-cmd.finished: + // returnOnce has been executed in another goroutine case <-timer.C: - cmd.Lock() - cmd.timedOut = true - cmd.Unlock() - - cmd.errorWithFallback(ErrTimeout) + returnOnce.Do(func() { + returnTicket() + cmd.errorWithFallback(ErrTimeout) + reportAllEvent() + }) return } }() @@ -191,33 +217,22 @@ func (c *command) reportEvent(eventType string) { c.events = append(c.events, eventType) } -func (c *command) isTimedOut() bool { - c.Lock() - defer c.Unlock() - - return c.timedOut -} - // errorWithFallback triggers the fallback while reporting the appropriate metric events. -// If called multiple times for a single command, only the first will execute to insure -// accurate metrics and prevent the fallback from executing more than once. func (c *command) errorWithFallback(err error) { - c.fallbackOnce.Do(func() { - eventType := "failure" - if err == ErrCircuitOpen { - eventType = "short-circuit" - } else if err == ErrMaxConcurrency { - eventType = "rejected" - } else if err == ErrTimeout { - eventType = "timeout" - } + eventType := "failure" + if err == ErrCircuitOpen { + eventType = "short-circuit" + } else if err == ErrMaxConcurrency { + eventType = "rejected" + } else if err == ErrTimeout { + eventType = "timeout" + } - c.reportEvent(eventType) - fallbackErr := c.tryFallback(err) - if fallbackErr != nil { - c.errChan <- fallbackErr - } - }) + c.reportEvent(eventType) + fallbackErr := c.tryFallback(err) + if fallbackErr != nil { + c.errChan <- fallbackErr + } } func (c *command) tryFallback(err error) error { diff --git a/hystrix/hystrix_test.go b/hystrix/hystrix_test.go index 3a34125..b0e308a 100644 --- a/hystrix/hystrix_test.go +++ b/hystrix/hystrix_test.go @@ -6,6 +6,7 @@ import ( "time" . "github.com/smartystreets/goconvey/convey" + "testing/quick" ) func TestSuccess(t *testing.T) { @@ -339,6 +340,30 @@ func TestFallbackAfterRejected(t *testing.T) { }) } +func TestReturnTicket_QuickCheck(t *testing.T) { + compareTicket := func() bool { + defer Flush() + ConfigureCommand("", CommandConfig{Timeout: 2}) + errChan := Go("", func() error { + c := make(chan struct{}) + <-c // should block + return nil + }, nil) + err := <-errChan + So(err, ShouldResemble, ErrTimeout) + cb, _, err := GetCircuit("") + So(err, ShouldBeNil) + return cb.executorPool.ActiveCount() == 0 + } + + Convey("with a run command that doesn't return", t, func() { + Convey("checking many times that after Go(), the ticket returns to the pool after the timeout", func() { + err := quick.Check(compareTicket, nil) + So(err, ShouldBeNil) + }) + }) +} + func TestReturnTicket(t *testing.T) { Convey("with a run command that doesn't return", t, func() { defer Flush() @@ -351,7 +376,7 @@ func TestReturnTicket(t *testing.T) { return nil }, nil) - Convey("the ticket returns to the pool after the timeout", func() { + Convey("after Go(), the ticket returns to the pool after the timeout", func() { err := <-errChan So(err, ShouldResemble, ErrTimeout) diff --git a/hystrix/metric_collector/default_metric_collector.go b/hystrix/metric_collector/default_metric_collector.go index 7deda02..aa59f03 100644 --- a/hystrix/metric_collector/default_metric_collector.go +++ b/hystrix/metric_collector/default_metric_collector.go @@ -2,7 +2,6 @@ package metricCollector import ( "sync" - "time" "github.com/afex/hystrix-go/hystrix/rolling" ) @@ -115,82 +114,22 @@ func (d *DefaultMetricCollector) RunDuration() *rolling.Timing { return d.runDuration } -// IncrementAttempts increments the number of requests seen in the latest time bucket. -func (d *DefaultMetricCollector) IncrementAttempts() { +func (d *DefaultMetricCollector) Update(r MetricResult) { d.mutex.RLock() defer d.mutex.RUnlock() - d.numRequests.Increment(1) -} - -// IncrementErrors increments the number of errors seen in the latest time bucket. -// Errors are any result from an attempt that is not a success. -func (d *DefaultMetricCollector) IncrementErrors() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.errors.Increment(1) -} - -// IncrementSuccesses increments the number of successes seen in the latest time bucket. -func (d *DefaultMetricCollector) IncrementSuccesses() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.successes.Increment(1) -} - -// IncrementFailures increments the number of failures seen in the latest time bucket. -func (d *DefaultMetricCollector) IncrementFailures() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.failures.Increment(1) -} - -// IncrementRejects increments the number of rejected requests seen in the latest time bucket. -func (d *DefaultMetricCollector) IncrementRejects() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.rejects.Increment(1) -} - -// IncrementShortCircuits increments the number of rejected requests seen in the latest time bucket. -func (d *DefaultMetricCollector) IncrementShortCircuits() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.shortCircuits.Increment(1) -} - -// IncrementTimeouts increments the number of requests that timed out in the latest time bucket. -func (d *DefaultMetricCollector) IncrementTimeouts() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.timeouts.Increment(1) -} -// IncrementFallbackSuccesses increments the number of successful calls to the fallback function in the latest time bucket. -func (d *DefaultMetricCollector) IncrementFallbackSuccesses() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.fallbackSuccesses.Increment(1) -} + d.numRequests.Increment(r.Attempts) + d.errors.Increment(r.Errors) + d.successes.Increment(r.Successes) + d.failures.Increment(r.Failures) + d.rejects.Increment(r.Rejects) + d.shortCircuits.Increment(r.ShortCircuits) + d.timeouts.Increment(r.Timeouts) + d.fallbackSuccesses.Increment(r.FallbackSuccesses) + d.fallbackFailures.Increment(r.FallbackFailures) -// IncrementFallbackFailures increments the number of failed calls to the fallback function in the latest time bucket. -func (d *DefaultMetricCollector) IncrementFallbackFailures() { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.fallbackFailures.Increment(1) -} - -// UpdateTotalDuration updates the total amount of time this circuit has been running. -func (d *DefaultMetricCollector) UpdateTotalDuration(timeSinceStart time.Duration) { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.totalDuration.Add(timeSinceStart) -} - -// UpdateRunDuration updates the amount of time the latest request took to complete. -func (d *DefaultMetricCollector) UpdateRunDuration(runDuration time.Duration) { - d.mutex.RLock() - defer d.mutex.RUnlock() - d.runDuration.Add(runDuration) + d.totalDuration.Add(r.TotalDuration) + d.runDuration.Add(r.RunDuration) } // Reset resets all metrics in this collector to 0. diff --git a/hystrix/metric_collector/metric_collector.go b/hystrix/metric_collector/metric_collector.go index 2972572..2142693 100644 --- a/hystrix/metric_collector/metric_collector.go +++ b/hystrix/metric_collector/metric_collector.go @@ -39,34 +39,26 @@ func (m *metricCollectorRegistry) Register(initMetricCollector func(string) Metr m.registry = append(m.registry, initMetricCollector) } +type MetricResult struct { + Attempts float64 + Errors float64 + Successes float64 + Failures float64 + Rejects float64 + ShortCircuits float64 + Timeouts float64 + FallbackSuccesses float64 + FallbackFailures float64 + TotalDuration time.Duration + RunDuration time.Duration +} + // MetricCollector represents the contract that all collectors must fulfill to gather circuit statistics. // Implementations of this interface do not have to maintain locking around thier data stores so long as // they are not modified outside of the hystrix context. type MetricCollector interface { - // IncrementAttempts increments the number of updates. - IncrementAttempts() - // IncrementErrors increments the number of unsuccessful attempts. - // Attempts minus Errors will equal successes within a time range. - // Errors are any result from an attempt that is not a success. - IncrementErrors() - // IncrementSuccesses increments the number of requests that succeed. - IncrementSuccesses() - // IncrementFailures increments the number of requests that fail. - IncrementFailures() - // IncrementRejects increments the number of requests that are rejected. - IncrementRejects() - // IncrementShortCircuits increments the number of requests that short circuited due to the circuit being open. - IncrementShortCircuits() - // IncrementTimeouts increments the number of timeouts that occurred in the circuit breaker. - IncrementTimeouts() - // IncrementFallbackSuccesses increments the number of successes that occurred during the execution of the fallback function. - IncrementFallbackSuccesses() - // IncrementFallbackFailures increments the number of failures that occurred during the execution of the fallback function. - IncrementFallbackFailures() - // UpdateTotalDuration updates the internal counter of how long we've run for. - UpdateTotalDuration(timeSinceStart time.Duration) - // UpdateRunDuration updates the internal counter of how long the last run took. - UpdateRunDuration(runDuration time.Duration) + // Update accepts a set of metrics from a command execution for remote instrumentation + Update(MetricResult) // Reset resets the internal counters and timers. Reset() } diff --git a/hystrix/metrics.go b/hystrix/metrics.go index 0fe8c78..8f564c7 100644 --- a/hystrix/metrics.go +++ b/hystrix/metrics.go @@ -67,47 +67,43 @@ func (m *metricExchange) Monitor() { func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) { // granular metrics + r := metricCollector.MetricResult{ + Attempts: 1, + TotalDuration: totalDuration, + RunDuration: update.RunDuration, + } + if update.Types[0] == "success" { - collector.IncrementAttempts() - collector.IncrementSuccesses() + r.Successes = 1 } if update.Types[0] == "failure" { - collector.IncrementFailures() - - collector.IncrementAttempts() - collector.IncrementErrors() + r.Failures = 1 + r.Errors = 1 } if update.Types[0] == "rejected" { - collector.IncrementRejects() - - collector.IncrementAttempts() - collector.IncrementErrors() + r.Rejects = 1 + r.Errors = 1 } if update.Types[0] == "short-circuit" { - collector.IncrementShortCircuits() - - collector.IncrementAttempts() - collector.IncrementErrors() + r.ShortCircuits = 1 + r.Errors = 1 } if update.Types[0] == "timeout" { - collector.IncrementTimeouts() - - collector.IncrementAttempts() - collector.IncrementErrors() + r.Timeouts = 1 + r.Errors = 1 } if len(update.Types) > 1 { // fallback metrics if update.Types[1] == "fallback-success" { - collector.IncrementFallbackSuccesses() + r.FallbackSuccesses = 1 } if update.Types[1] == "fallback-failure" { - collector.IncrementFallbackFailures() + r.FallbackFailures = 1 } } - collector.UpdateTotalDuration(totalDuration) - collector.UpdateRunDuration(update.RunDuration) + collector.Update(r) wg.Done() } diff --git a/hystrix/rolling/rolling.go b/hystrix/rolling/rolling.go index 5a69481..eef9c62 100644 --- a/hystrix/rolling/rolling.go +++ b/hystrix/rolling/rolling.go @@ -51,6 +51,10 @@ func (r *Number) removeOldBuckets() { // Increment increments the number in current timeBucket. func (r *Number) Increment(i float64) { + if i == 0 { + return + } + r.Mutex.Lock() defer r.Mutex.Unlock() diff --git a/plugins/datadog_collector.go b/plugins/datadog_collector.go index 57cc394..68e170b 100644 --- a/plugins/datadog_collector.go +++ b/plugins/datadog_collector.go @@ -1,7 +1,6 @@ package plugins import ( - "time" // Developed on https://github.com/DataDog/datadog-go/tree/a27810dd518c69be741a7fd5d0e39f674f615be8 "github.com/DataDog/datadog-go/statsd" @@ -115,68 +114,41 @@ func NewDatadogCollectorWithClient(client DatadogClient) func(string) metricColl } } -// IncrementAttempts increments the number of calls to this circuit. -func (dc *DatadogCollector) IncrementAttempts() { - dc.client.Count(DM_Attempts, 1, dc.tags, 1.0) -} - -// IncrementErrors increments the number of unsuccessful attempts. -// Attempts minus Errors will equal successes within a time range. -// Errors are any result from an attempt that is not a success. -func (dc *DatadogCollector) IncrementErrors() { - dc.client.Count(DM_Errors, 1, dc.tags, 1.0) -} - -// IncrementSuccesses increments the number of requests that succeed. -func (dc *DatadogCollector) IncrementSuccesses() { - dc.client.Gauge(DM_CircuitOpen, 0, dc.tags, 1.0) - dc.client.Count(DM_Successes, 1, dc.tags, 1.0) -} - -// IncrementFailures increments the number of requests that fail. -func (dc *DatadogCollector) IncrementFailures() { - dc.client.Count(DM_Failures, 1, dc.tags, 1.0) -} - -// IncrementRejects increments the number of requests that are rejected. -func (dc *DatadogCollector) IncrementRejects() { - dc.client.Count(DM_Rejects, 1, dc.tags, 1.0) -} - -// IncrementShortCircuits increments the number of requests that short circuited -// due to the circuit being open. -func (dc *DatadogCollector) IncrementShortCircuits() { - dc.client.Gauge(DM_CircuitOpen, 1, dc.tags, 1.0) - dc.client.Count(DM_ShortCircuits, 1, dc.tags, 1.0) -} - -// IncrementTimeouts increments the number of timeouts that occurred in the -// circuit breaker. -func (dc *DatadogCollector) IncrementTimeouts() { - dc.client.Count(DM_Timeouts, 1, dc.tags, 1.0) -} - -// IncrementFallbackSuccesses increments the number of successes that occurred -// during the execution of the fallback function. -func (dc *DatadogCollector) IncrementFallbackSuccesses() { - dc.client.Count(DM_FallbackSuccesses, 1, dc.tags, 1.0) -} - -// IncrementFallbackFailures increments the number of failures that occurred -// during the execution of the fallback function. -func (dc *DatadogCollector) IncrementFallbackFailures() { - dc.client.Count(DM_FallbackFailures, 1, dc.tags, 1.0) -} +func (dc *DatadogCollector) Update(r metricCollector.MetricResult) { + if r.Attempts > 0 { + dc.client.Count(DM_Attempts, int64(r.Attempts), dc.tags, 1.0) + } + if r.Errors > 0 { + dc.client.Count(DM_Errors, int64(r.Errors), dc.tags, 1.0) + } + if r.Successes > 0 { + dc.client.Gauge(DM_CircuitOpen, 0, dc.tags, 1.0) + dc.client.Count(DM_Successes, int64(r.Successes), dc.tags, 1.0) + } + if r.Failures > 0 { + dc.client.Count(DM_Failures, int64(r.Failures), dc.tags, 1.0) + } + if r.Rejects > 0 { + dc.client.Count(DM_Rejects, int64(r.Rejects), dc.tags, 1.0) + } + if r.ShortCircuits > 0 { + dc.client.Gauge(DM_CircuitOpen, 1, dc.tags, 1.0) + dc.client.Count(DM_ShortCircuits, int64(r.ShortCircuits), dc.tags, 1.0) + } + if r.Timeouts > 0 { + dc.client.Count(DM_Timeouts, int64(r.Timeouts), dc.tags, 1.0) + } + if r.FallbackSuccesses > 0 { + dc.client.Count(DM_FallbackSuccesses, int64(r.FallbackSuccesses), dc.tags, 1.0) + } + if r.FallbackFailures > 0 { + dc.client.Count(DM_FallbackFailures, int64(r.FallbackFailures), dc.tags, 1.0) + } -// UpdateTotalDuration updates the internal counter of how long we've run for. -func (dc *DatadogCollector) UpdateTotalDuration(timeSinceStart time.Duration) { - ms := float64(timeSinceStart.Nanoseconds() / 1000000) + ms := float64(r.TotalDuration.Nanoseconds() / 1000000) dc.client.TimeInMilliseconds(DM_TotalDuration, ms, dc.tags, 1.0) -} -// UpdateRunDuration updates the internal counter of how long the last run took. -func (dc *DatadogCollector) UpdateRunDuration(runDuration time.Duration) { - ms := float64(runDuration.Nanoseconds() / 1000000) + ms = float64(r.RunDuration.Nanoseconds() / 1000000) dc.client.TimeInMilliseconds(DM_RunDuration, ms, dc.tags, 1.0) } diff --git a/plugins/graphite_aggregator.go b/plugins/graphite_aggregator.go index f26e916..3a3b74f 100644 --- a/plugins/graphite_aggregator.go +++ b/plugins/graphite_aggregator.go @@ -72,12 +72,15 @@ func NewGraphiteCollector(name string) metricCollector.MetricCollector { } } -func (g *GraphiteCollector) incrementCounterMetric(prefix string) { +func (g *GraphiteCollector) incrementCounterMetric(prefix string, i float64) { + if i == 0 { + return + } c, ok := metrics.GetOrRegister(prefix, makeCounterFunc).(metrics.Counter) if !ok { return } - c.Inc(1) + c.Inc(int64(i)) } func (g *GraphiteCollector) updateTimerMetric(prefix string, dur time.Duration) { @@ -88,74 +91,18 @@ func (g *GraphiteCollector) updateTimerMetric(prefix string, dur time.Duration) c.Update(dur) } -// IncrementAttempts increments the number of calls to this circuit. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementAttempts() { - g.incrementCounterMetric(g.attemptsPrefix) -} - -// IncrementErrors increments the number of unsuccessful attempts. -// Attempts minus Errors will equal successes within a time range. -// Errors are any result from an attempt that is not a success. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementErrors() { - g.incrementCounterMetric(g.errorsPrefix) - -} - -// IncrementSuccesses increments the number of requests that succeed. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementSuccesses() { - g.incrementCounterMetric(g.successesPrefix) - -} - -// IncrementFailures increments the number of requests that fail. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementFailures() { - g.incrementCounterMetric(g.failuresPrefix) -} - -// IncrementRejects increments the number of requests that are rejected. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementRejects() { - g.incrementCounterMetric(g.rejectsPrefix) -} - -// IncrementShortCircuits increments the number of requests that short circuited due to the circuit being open. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementShortCircuits() { - g.incrementCounterMetric(g.shortCircuitsPrefix) -} - -// IncrementTimeouts increments the number of timeouts that occurred in the circuit breaker. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementTimeouts() { - g.incrementCounterMetric(g.timeoutsPrefix) -} - -// IncrementFallbackSuccesses increments the number of successes that occurred during the execution of the fallback function. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementFallbackSuccesses() { - g.incrementCounterMetric(g.fallbackSuccessesPrefix) -} - -// IncrementFallbackFailures increments the number of failures that occurred during the execution of the fallback function. -// This registers as a counter in the graphite collector. -func (g *GraphiteCollector) IncrementFallbackFailures() { - g.incrementCounterMetric(g.fallbackFailuresPrefix) -} - -// UpdateTotalDuration updates the internal counter of how long we've run for. -// This registers as a timer in the graphite collector. -func (g *GraphiteCollector) UpdateTotalDuration(timeSinceStart time.Duration) { - g.updateTimerMetric(g.totalDurationPrefix, timeSinceStart) -} - -// UpdateRunDuration updates the internal counter of how long the last run took. -// This registers as a timer in the graphite collector. -func (g *GraphiteCollector) UpdateRunDuration(runDuration time.Duration) { - g.updateTimerMetric(g.runDurationPrefix, runDuration) +func (g *GraphiteCollector) Update(r metricCollector.MetricResult) { + g.incrementCounterMetric(g.attemptsPrefix, r.Attempts) + g.incrementCounterMetric(g.errorsPrefix, r.Errors) + g.incrementCounterMetric(g.successesPrefix, r.Successes) + g.incrementCounterMetric(g.failuresPrefix, r.Failures) + g.incrementCounterMetric(g.rejectsPrefix, r.Rejects) + g.incrementCounterMetric(g.shortCircuitsPrefix, r.ShortCircuits) + g.incrementCounterMetric(g.timeoutsPrefix, r.Timeouts) + g.incrementCounterMetric(g.fallbackSuccessesPrefix, r.FallbackSuccesses) + g.incrementCounterMetric(g.fallbackFailuresPrefix, r.FallbackFailures) + g.updateTimerMetric(g.totalDurationPrefix, r.TotalDuration) + g.updateTimerMetric(g.runDurationPrefix, r.RunDuration) } // Reset is a noop operation in this collector. diff --git a/plugins/prometheus_collector.go b/plugins/prometheus_collector.go index f79f2a1..555c63f 100644 --- a/plugins/prometheus_collector.go +++ b/plugins/prometheus_collector.go @@ -3,7 +3,6 @@ package plugins import ( "github.com/afex/hystrix-go/hystrix/metric_collector" "github.com/prometheus/client_golang/prometheus" - "time" ) // Constant namespace for metrics @@ -163,64 +162,19 @@ func (hm *PrometheusCollector) Collector(name string) metricCollector.MetricColl return hc } -// IncrementAttempts increments the number of updates. -func (hc *cmdCollector) IncrementAttempts() { - hc.metrics.attempts.WithLabelValues(hc.commandName).Inc() -} - -// IncrementErrors increments the number of unsuccessful attempts. -// Attempts minus Errors will equal successes within a time range. -// Errors are any result from an attempt that is not a success. -func (hc *cmdCollector) IncrementErrors() { - hc.metrics.errors.WithLabelValues(hc.commandName).Inc() -} - -// IncrementSuccesses increments the number of requests that succeed. -func (hc *cmdCollector) IncrementSuccesses() { - hc.metrics.successes.WithLabelValues(hc.commandName).Inc() -} - -// IncrementFailures increments the number of requests that fail. -func (hc *cmdCollector) IncrementFailures() { - hc.metrics.failures.WithLabelValues(hc.commandName).Inc() -} - -// IncrementRejects increments the number of requests that are rejected. -func (hc *cmdCollector) IncrementRejects() { - hc.metrics.rejects.WithLabelValues(hc.commandName).Inc() -} - -// IncrementShortCircuits increments the number of requests that short circuited due to the circuit being open. -func (hc *cmdCollector) IncrementShortCircuits() { - hc.metrics.shortCircuits.WithLabelValues(hc.commandName).Inc() -} - -// IncrementTimeouts increments the number of timeouts that occurred in the circuit breaker. -func (hc *cmdCollector) IncrementTimeouts() { - hc.metrics.timeouts.WithLabelValues(hc.commandName).Inc() -} - -// IncrementFallbackSuccesses increments the number of successes that occurred during the execution of the fallback function. -func (hc *cmdCollector) IncrementFallbackSuccesses() { - hc.metrics.fallbackSuccesses.WithLabelValues(hc.commandName).Inc() -} - -// IncrementFallbackFailures increments the number of failures that occurred during the execution of the fallback function. -func (hc *cmdCollector) IncrementFallbackFailures() { - hc.metrics.fallbackFailures.WithLabelValues(hc.commandName).Inc() -} - -// UpdateTotalDuration updates the internal counter of how long we've run for. -func (hc *cmdCollector) UpdateTotalDuration(timeSinceStart time.Duration) { - hc.metrics.totalDuration.WithLabelValues(hc.commandName).Set(timeSinceStart.Seconds()) -} - -// UpdateRunDuration updates the internal counter of how long the last run took. -func (hc *cmdCollector) UpdateRunDuration(runDuration time.Duration) { - hc.metrics.runDuration.WithLabelValues(hc.commandName).Observe(runDuration.Seconds()) +func (hc *cmdCollector) Update(result metricCollector.MetricResult) { + hc.metrics.attempts.WithLabelValues(hc.commandName).Add(result.Attempts) + hc.metrics.errors.WithLabelValues(hc.commandName).Add(result.Errors) + hc.metrics.successes.WithLabelValues(hc.commandName).Add(result.Successes) + hc.metrics.failures.WithLabelValues(hc.commandName).Add(result.Failures) + hc.metrics.rejects.WithLabelValues(hc.commandName).Add(result.Rejects) + hc.metrics.shortCircuits.WithLabelValues(hc.commandName).Add(result.ShortCircuits) + hc.metrics.timeouts.WithLabelValues(hc.commandName).Add(result.Timeouts) + hc.metrics.fallbackSuccesses.WithLabelValues(hc.commandName).Add(result.FallbackSuccesses) + hc.metrics.fallbackFailures.WithLabelValues(hc.commandName).Add(result.FallbackFailures) + hc.metrics.totalDuration.WithLabelValues(hc.commandName).Set(result.TotalDuration.Seconds()) } // Reset resets the internal counters and timers. func (hc *cmdCollector) Reset() { - } diff --git a/plugins/statsd_collector.go b/plugins/statsd_collector.go index 8c0895d..683cfcb 100644 --- a/plugins/statsd_collector.go +++ b/plugins/statsd_collector.go @@ -116,8 +116,8 @@ func (g *StatsdCollector) setGauge(prefix string, value int64) { } } -func (g *StatsdCollector) incrementCounterMetric(prefix string) { - err := g.client.Inc(prefix, 1, g.sampleRate) +func (g *StatsdCollector) incrementCounterMetric(prefix string, i float64) { + err := g.client.Inc(prefix, int64(i), g.sampleRate) if err != nil { log.Printf("Error sending statsd metrics %s", prefix) } @@ -130,76 +130,24 @@ func (g *StatsdCollector) updateTimerMetric(prefix string, dur time.Duration) { } } -// IncrementAttempts increments the number of calls to this circuit. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementAttempts() { - g.incrementCounterMetric(g.attemptsPrefix) -} - -// IncrementErrors increments the number of unsuccessful attempts. -// Attempts minus Errors will equal successes within a time range. -// Errors are any result from an attempt that is not a success. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementErrors() { - g.incrementCounterMetric(g.errorsPrefix) - -} - -// IncrementSuccesses increments the number of requests that succeed. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementSuccesses() { - g.setGauge(g.circuitOpenPrefix, 0) - g.incrementCounterMetric(g.successesPrefix) - -} - -// IncrementFailures increments the number of requests that fail. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementFailures() { - g.incrementCounterMetric(g.failuresPrefix) -} - -// IncrementRejects increments the number of requests that are rejected. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementRejects() { - g.incrementCounterMetric(g.rejectsPrefix) -} - -// IncrementShortCircuits increments the number of requests that short circuited due to the circuit being open. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementShortCircuits() { - g.setGauge(g.circuitOpenPrefix, 1) - g.incrementCounterMetric(g.shortCircuitsPrefix) -} - -// IncrementTimeouts increments the number of timeouts that occurred in the circuit breaker. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementTimeouts() { - g.incrementCounterMetric(g.timeoutsPrefix) -} - -// IncrementFallbackSuccesses increments the number of successes that occurred during the execution of the fallback function. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementFallbackSuccesses() { - g.incrementCounterMetric(g.fallbackSuccessesPrefix) -} - -// IncrementFallbackFailures increments the number of failures that occurred during the execution of the fallback function. -// This registers as a counter in the Statsd collector. -func (g *StatsdCollector) IncrementFallbackFailures() { - g.incrementCounterMetric(g.fallbackFailuresPrefix) -} - -// UpdateTotalDuration updates the internal counter of how long we've run for. -// This registers as a timer in the Statsd collector. -func (g *StatsdCollector) UpdateTotalDuration(timeSinceStart time.Duration) { - g.updateTimerMetric(g.totalDurationPrefix, timeSinceStart) -} +func (g *StatsdCollector) Update(r metricCollector.MetricResult) { + if r.Successes > 0 { + g.setGauge(g.circuitOpenPrefix, 0) + } else if r.ShortCircuits > 0 { + g.setGauge(g.circuitOpenPrefix, 1) + } -// UpdateRunDuration updates the internal counter of how long the last run took. -// This registers as a timer in the Statsd collector. -func (g *StatsdCollector) UpdateRunDuration(runDuration time.Duration) { - g.updateTimerMetric(g.runDurationPrefix, runDuration) + g.incrementCounterMetric(g.attemptsPrefix, r.Attempts) + g.incrementCounterMetric(g.errorsPrefix, r.Errors) + g.incrementCounterMetric(g.successesPrefix, r.Successes) + g.incrementCounterMetric(g.failuresPrefix, r.Failures) + g.incrementCounterMetric(g.rejectsPrefix, r.Rejects) + g.incrementCounterMetric(g.shortCircuitsPrefix, r.ShortCircuits) + g.incrementCounterMetric(g.timeoutsPrefix, r.Timeouts) + g.incrementCounterMetric(g.fallbackSuccessesPrefix, r.FallbackSuccesses) + g.incrementCounterMetric(g.fallbackFailuresPrefix, r.FallbackFailures) + g.updateTimerMetric(g.totalDurationPrefix, r.TotalDuration) + g.updateTimerMetric(g.runDurationPrefix, r.RunDuration) } // Reset is a noop operation in this collector. diff --git a/scripts/vagrant.sh b/scripts/vagrant.sh index e7a2267..c31369f 100644 --- a/scripts/vagrant.sh +++ b/scripts/vagrant.sh @@ -1,8 +1,8 @@ #!/bin/bash set -e -wget -q https://storage.googleapis.com/golang/go1.6.2.linux-amd64.tar.gz -tar -C /usr/local -xzf go1.6.2.linux-amd64.tar.gz +wget -q https://storage.googleapis.com/golang/go1.9.4.linux-amd64.tar.gz +tar -C /usr/local -xzf go1.9.4.linux-amd64.tar.gz apt-get update apt-get -y install git mercurial apache2-utils @@ -20,3 +20,5 @@ go get github.com/rcrowley/go-metrics go get github.com/DataDog/datadog-go/statsd chown -R vagrant:vagrant /go + +echo "cd /go/src/github.com/afex/hystrix-go" >> /home/vagrant/.bashrc