Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update from upstream and adapt to new MetricCollector interface #2

Merged
merged 18 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ script:
- cd hystrix
- go test -race
go:
- 1.3.3
- 1.4.3
- 1.5.4
- 1.6.0
- 1.6.1
- 1.6.2
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
- tip
env:
global:
- GORACE="halt_on_error=1"
- GORACE="halt_on_error=1"
5 changes: 4 additions & 1 deletion hystrix/circuit.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time,
return fmt.Errorf("no event types sent for metrics")
}

if eventTypes[0] == "success" && circuit.open {
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
if eventTypes[0] == "success" && o {
circuit.setClose()
}

Expand Down
53 changes: 53 additions & 0 deletions hystrix/circuit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

. "github.com/smartystreets/goconvey/convey"
"testing/quick"
"math/rand"
)

func TestGetCircuit(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,54 @@ func TestReportEventOpenThenClose(t *testing.T) {
})
})
}

func TestReportEventMultiThreaded(t *testing.T) {
rand.Seed(time.Now().UnixNano())
run := func() bool {
defer Flush()
// Make the circuit easily open and close intermittently.
ConfigureCommand("", CommandConfig{
MaxConcurrentRequests: 1,
ErrorPercentThreshold: 1,
RequestVolumeThreshold: 1,
SleepWindow: 10,
})
cb, _, _ := GetCircuit("")
count := 5
wg := &sync.WaitGroup{}
wg.Add(count)
c := make(chan bool, count)
for i := 0; i < count; i++ {
go func() {
defer func() {
if r := recover(); r != nil {
t.Error(r)
c <- false
} else {
wg.Done()
}
}()
// randomized eventType to open/close circuit
eventType := "rejected"
if rand.Intn(3) == 1 {
eventType = "success"
}
err := cb.ReportEvent([]string{eventType}, time.Now(), time.Second)
if err != nil {
t.Error(err)
}
time.Sleep(time.Millisecond)
// cb.IsOpen() internally calls cb.setOpen()
cb.IsOpen()
}()
}
go func() {
wg.Wait()
c <- true
}()
return <-c
}
if err := quick.Check(run, nil); err != nil {
t.Error(err)
}
}
115 changes: 65 additions & 50 deletions hystrix/hystrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ type command struct {
start time.Time
errChan chan error
finished chan bool
fallbackOnce *sync.Once
circuit *CircuitBreaker
run runFunc
fallback fallbackFunc
runDuration time.Duration
events []string
timedOut bool
}

var (
Expand All @@ -59,7 +57,6 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error {
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
fallbackOnce: &sync.Once{},
}

// dont have methods with explicit params and returns
Expand All @@ -72,6 +69,29 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error {
return cmd.errChan
}
cmd.circuit = circuit
ticketCond := sync.NewCond(cmd)
ticketChecked := false
// When the caller extracts error from returned errChan, it's assumed that
// the ticket's been returned to executorPool. Therefore, returnTicket() can
// not run after cmd.errorWithFallback().
returnTicket := func() {
cmd.Lock()
// Avoid releasing before a ticket is acquired.
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}
// Shared by the following two goroutines. It ensures only the faster
// goroutine runs errWithFallback() and reportAllEvent().
returnOnce := &sync.Once{}
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
if err != nil {
log.Print(err)
}
}

go func() {
defer func() { cmd.finished <- true }()
Expand All @@ -80,7 +100,16 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error {
// Rejecting new executions allows backends to recover, and the circuit will allow
// new traffic when it feels a healthly state has returned.
if !cmd.circuit.AllowRequest() {
cmd.errorWithFallback(ErrCircuitOpen)
cmd.Lock()
// It's safe for another goroutine to go ahead releasing a nil ticket.
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ErrCircuitOpen)
reportAllEvent()
})
return
}

Expand All @@ -92,51 +121,48 @@ func Go(name string, run runFunc, fallback fallbackFunc) chan error {
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
cmd.errorWithFallback(ErrMaxConcurrency)
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ErrMaxConcurrency)
reportAllEvent()
})
return
}

runStart := time.Now()
runErr := run()

if !cmd.isTimedOut() {
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)

returnTicket()
if runErr != nil {
cmd.errorWithFallback(runErr)
return
}

cmd.reportEvent("success")
}
})
}()

go func() {
defer func() {
cmd.Lock()
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()

err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
if err != nil {
log.Print(err)
}
}()

timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()

select {
case <-cmd.finished:
// returnOnce has been executed in another goroutine
case <-timer.C:
cmd.Lock()
cmd.timedOut = true
cmd.Unlock()

cmd.errorWithFallback(ErrTimeout)
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ErrTimeout)
reportAllEvent()
})
return
}
}()
Expand Down Expand Up @@ -191,33 +217,22 @@ func (c *command) reportEvent(eventType string) {
c.events = append(c.events, eventType)
}

func (c *command) isTimedOut() bool {
c.Lock()
defer c.Unlock()

return c.timedOut
}

// errorWithFallback triggers the fallback while reporting the appropriate metric events.
// If called multiple times for a single command, only the first will execute to insure
// accurate metrics and prevent the fallback from executing more than once.
func (c *command) errorWithFallback(err error) {
c.fallbackOnce.Do(func() {
eventType := "failure"
if err == ErrCircuitOpen {
eventType = "short-circuit"
} else if err == ErrMaxConcurrency {
eventType = "rejected"
} else if err == ErrTimeout {
eventType = "timeout"
}
eventType := "failure"
if err == ErrCircuitOpen {
eventType = "short-circuit"
} else if err == ErrMaxConcurrency {
eventType = "rejected"
} else if err == ErrTimeout {
eventType = "timeout"
}

c.reportEvent(eventType)
fallbackErr := c.tryFallback(err)
if fallbackErr != nil {
c.errChan <- fallbackErr
}
})
c.reportEvent(eventType)
fallbackErr := c.tryFallback(err)
if fallbackErr != nil {
c.errChan <- fallbackErr
}
}

func (c *command) tryFallback(err error) error {
Expand Down
27 changes: 26 additions & 1 deletion hystrix/hystrix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

. "github.com/smartystreets/goconvey/convey"
"testing/quick"
)

func TestSuccess(t *testing.T) {
Expand Down Expand Up @@ -339,6 +340,30 @@ func TestFallbackAfterRejected(t *testing.T) {
})
}

func TestReturnTicket_QuickCheck(t *testing.T) {
compareTicket := func() bool {
defer Flush()
ConfigureCommand("", CommandConfig{Timeout: 2})
errChan := Go("", func() error {
c := make(chan struct{})
<-c // should block
return nil
}, nil)
err := <-errChan
So(err, ShouldResemble, ErrTimeout)
cb, _, err := GetCircuit("")
So(err, ShouldBeNil)
return cb.executorPool.ActiveCount() == 0
}

Convey("with a run command that doesn't return", t, func() {
Convey("checking many times that after Go(), the ticket returns to the pool after the timeout", func() {
err := quick.Check(compareTicket, nil)
So(err, ShouldBeNil)
})
})
}

func TestReturnTicket(t *testing.T) {
Convey("with a run command that doesn't return", t, func() {
defer Flush()
Expand All @@ -351,7 +376,7 @@ func TestReturnTicket(t *testing.T) {
return nil
}, nil)

Convey("the ticket returns to the pool after the timeout", func() {
Convey("after Go(), the ticket returns to the pool after the timeout", func() {
err := <-errChan
So(err, ShouldResemble, ErrTimeout)

Expand Down
Loading