Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor checker worker: change its queue from go channels to Redis sets #103

Merged
merged 6 commits into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions checker/worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,32 @@ import (
"time"

"github.com/moira-alert/moira/checker"
"github.com/moira-alert/moira/database"
)

const sleepAfterGetTriggerIDError = time.Millisecond * 500
const sleepWhenNoTriggerToCheck = time.Second * 1
const sleepAfterPanic = time.Second * 1
const sleepAfterCheckingError = time.Second * 5

func (worker *Checker) startTriggerHandler() error {
for {
triggerID, ok := <-worker.triggersToCheck
if !ok {
select {
case <-worker.tomb.Dying():
return nil
default:
triggerID, err := worker.Database.GetTriggerToCheck()
if err != nil {
if err == database.ErrNil {
<-time.After(sleepWhenNoTriggerToCheck)
} else {
worker.Logger.Errorf("Failed to handle trigger loop: %s", err.Error())
<-time.After(sleepAfterGetTriggerIDError)
}
continue
}
worker.handleTrigger(triggerID)
}
worker.handleTrigger(triggerID)
}
}

Expand All @@ -22,11 +39,13 @@ func (worker *Checker) handleTrigger(triggerID string) {
if r := recover(); r != nil {
worker.Metrics.HandleError.Mark(1)
worker.Logger.Errorf("Panic while handle trigger %s: message: '%s' stack: %s", triggerID, r, debug.Stack())
<-time.After(sleepAfterPanic)
}
}()
if err := worker.handleTriggerInLock(triggerID); err != nil {
worker.Metrics.HandleError.Mark(1)
worker.Logger.Errorf("Failed to handle trigger: %s error: %s", triggerID, err.Error())
<-time.After(sleepAfterCheckingError)
}
}

Expand Down
6 changes: 5 additions & 1 deletion checker/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ func (worker *Checker) handleMetricEvent(pattern string) error {
}

func (worker *Checker) addTriggerIDsIfNeeded(triggerIDs []string) {
needToCheckTriggerIDs := make([]string, len(triggerIDs))
for _, triggerID := range triggerIDs {
if worker.needHandleTrigger(triggerID) {
worker.triggersToCheck <- triggerID
needToCheckTriggerIDs = append(needToCheckTriggerIDs, triggerID)
}
}
if len(needToCheckTriggerIDs) > 0 {
worker.Database.AddTriggersToCheck(needToCheckTriggerIDs)
}
}

func (worker *Checker) needHandleTrigger(triggerID string) bool {
Expand Down
32 changes: 8 additions & 24 deletions checker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ import (

// Checker represents workers for periodically triggers checking based by new events
type Checker struct {
Logger moira.Logger
Database moira.Database
Config *checker.Config
Metrics *graphite.CheckerMetrics
TriggerCache *cache.Cache
PatternCache *cache.Cache
lastData int64
tomb tomb.Tomb
triggersToCheck chan string
Logger moira.Logger
Database moira.Database
Config *checker.Config
Metrics *graphite.CheckerMetrics
TriggerCache *cache.Cache
PatternCache *cache.Cache
lastData int64
tomb tomb.Tomb
}

// Start start schedule new MetricEvents and check for NODATA triggers
Expand All @@ -32,7 +31,6 @@ func (worker *Checker) Start() error {
}

worker.lastData = time.Now().UTC().Unix()
worker.triggersToCheck = make(chan string, 16384)

metricEventsChannel, err := worker.Database.SubscribeMetricEvents(&worker.tomb)
if err != nil {
Expand All @@ -52,29 +50,15 @@ func (worker *Checker) Start() error {
go func() {
for {
<-worker.tomb.Dying()
close(worker.triggersToCheck)
worker.Logger.Info("Checking for new events stopped")
return
}
}()

worker.tomb.Go(worker.checkTriggersToCheckChannelLen)
worker.tomb.Go(func() error { return worker.checkMetricEventsChannelLen(metricEventsChannel) })
return nil
}

func (worker *Checker) checkTriggersToCheckChannelLen() error {
checkTicker := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-worker.tomb.Dying():
return nil
case <-checkTicker.C:
worker.Metrics.TriggersToCheckChannelLen.Update(int64(len(worker.triggersToCheck)))
}
}
}

func (worker *Checker) checkMetricEventsChannelLen(ch <-chan *moira.MetricEvent) error {
checkTicker := time.NewTicker(time.Millisecond * 100)
for {
Expand Down
40 changes: 40 additions & 0 deletions database/redis/triggers_to_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package redis

import (
"fmt"

"github.com/garyburd/redigo/redis"
"github.com/moira-alert/moira/database"
)

// AddTriggersToCheck gets trigger IDs and save it to Redis Set
func (connector *DbConnector) AddTriggersToCheck(triggerIDs []string) error {
c := connector.pool.Get()
defer c.Close()

c.Send("MULTI")
for _, triggerID := range triggerIDs {
c.Send("SADD", triggerToCheckKey, triggerID)
}
_, err := redis.Values(c.Do("EXEC"))
if err != nil {
return fmt.Errorf("failed to add triggers to check: %s", err.Error())
}
return nil
}

// GetTriggerToCheck return random trigger ID from Redis Set
func (connector *DbConnector) GetTriggerToCheck() (string, error) {
c := connector.pool.Get()
defer c.Close()
triggerID, err := redis.String(c.Do("SPOP", triggerToCheckKey))
if err != nil {
if err == redis.ErrNil {
return "", database.ErrNil
}
return "", fmt.Errorf("failed to pop trigger to check: %s", err.Error())
}
return triggerID, err
}

var triggerToCheckKey = "moira-triggers-to-check"
96 changes: 96 additions & 0 deletions database/redis/triggers_to_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package redis

import (
"testing"

"github.com/moira-alert/moira/database"
"github.com/satori/go.uuid"
. "github.com/smartystreets/goconvey/convey"

"github.com/moira-alert/moira/logging/go-logging"
)

func TestTriggerToCheck(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "info", "test")
dataBase := NewDatabase(logger, config)
dataBase.flush()
defer dataBase.flush()
Convey("Trigger to check get and add", t, func() {
triggerID1 := uuid.NewV4().String()
triggerID2 := uuid.NewV4().String()
triggerID3 := uuid.NewV4().String()

actual, err := dataBase.GetTriggerToCheck()
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)

err = dataBase.AddTriggersToCheck([]string{triggerID1})
So(err, ShouldBeNil)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldBeNil)
So(actual, ShouldResemble, triggerID1)

err = dataBase.AddTriggersToCheck([]string{triggerID1})
So(err, ShouldBeNil)

err = dataBase.AddTriggersToCheck([]string{triggerID1})
So(err, ShouldBeNil)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldBeNil)
So(actual, ShouldResemble, triggerID1)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)

triggerArr := []string{triggerID1, triggerID2, triggerID3}
err = dataBase.AddTriggersToCheck(triggerArr)
So(err, ShouldBeNil)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldBeNil)
So(actual, ShouldBeIn, triggerArr)
triggerArr = removeValue(triggerArr, actual)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldBeNil)
So(actual, ShouldBeIn, triggerArr)
triggerArr = removeValue(triggerArr, actual)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldBeNil)
So(actual, ShouldBeIn, triggerArr)

actual, err = dataBase.GetTriggerToCheck()
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)
})
}

func TestTriggerToCheckConnection(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "info", "test")
dataBase := NewDatabase(logger, emptyConfig)
dataBase.flush()
defer dataBase.flush()
Convey("Should throw error when no connection", t, func() {
err := dataBase.AddTriggersToCheck([]string{"123"})
So(err, ShouldNotBeNil)

triggerID, err := dataBase.GetTriggerToCheck()
So(triggerID, ShouldBeEmpty)
So(err, ShouldNotBeNil)
})
}

func removeValue(triggerArr []string, triggerID string) []string {
index := 0
for i, trigger := range triggerArr {
if trigger == triggerID {
index = i
break
}
}
return append(triggerArr[:index], triggerArr[index+1:]...)
}
3 changes: 3 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type Database interface {
RemoveMetricValues(metric string, toTime int64) error
RemoveMetricsValues(metrics []string, toTime int64) error

AddTriggersToCheck(triggerIDs []string) error
GetTriggerToCheck() (string, error)

// TriggerCheckLock storing
AcquireTriggerCheckLock(triggerID string, timeout int) error
DeleteTriggerCheckLock(triggerID string) error
Expand Down
13 changes: 6 additions & 7 deletions metrics/graphite/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package graphite

// CheckerMetrics is a collection of metrics used in checker
type CheckerMetrics struct {
CheckError Meter
HandleError Meter
TriggersCheckTime Timer
TriggerCheckTime TimerMap
TriggersToCheckChannelLen Histogram
MetricEventsChannelLen Histogram
MetricEventsHandleTime Timer
CheckError Meter
HandleError Meter
TriggersCheckTime Timer
TriggerCheckTime TimerMap
MetricEventsChannelLen Histogram
MetricEventsHandleTime Timer
}
13 changes: 6 additions & 7 deletions metrics/graphite/go-metrics/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ func ConfigureNotifierMetrics(prefix string) *graphite.NotifierMetrics {
// ConfigureCheckerMetrics is checker metrics configurator
func ConfigureCheckerMetrics(prefix string) *graphite.CheckerMetrics {
return &graphite.CheckerMetrics{
CheckError: registerMeter(metricNameWithPrefix(prefix, "errors.check")),
HandleError: registerMeter(metricNameWithPrefix(prefix, "errors.handle")),
TriggersCheckTime: registerTimer(metricNameWithPrefix(prefix, "triggers")),
TriggerCheckTime: newTimerMap(metricNameWithPrefix(prefix, "trigger")),
TriggersToCheckChannelLen: registerHistogram(metricNameWithPrefix(prefix, "triggersToCheck")),
MetricEventsChannelLen: registerHistogram(metricNameWithPrefix(prefix, "metricEvents")),
MetricEventsHandleTime: registerTimer(metricNameWithPrefix(prefix, "metricEventsHandle")),
CheckError: registerMeter(metricNameWithPrefix(prefix, "errors.check")),
HandleError: registerMeter(metricNameWithPrefix(prefix, "errors.handle")),
TriggersCheckTime: registerTimer(metricNameWithPrefix(prefix, "triggers")),
TriggerCheckTime: newTimerMap(metricNameWithPrefix(prefix, "trigger")),
MetricEventsChannelLen: registerHistogram(metricNameWithPrefix(prefix, "metricEvents")),
MetricEventsHandleTime: registerTimer(metricNameWithPrefix(prefix, "metricEventsHandle")),
}
}

Expand Down
33 changes: 29 additions & 4 deletions mock/moira-alert/database.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.