diff --git a/go.mod b/go.mod index 3aac7fa..787dcec 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/prometheus/client_golang v1.12.2 github.com/robfig/cron/v3 v3.0.1 github.com/sethvargo/go-retry v0.1.0 + github.com/slack-go/slack v0.10.3 // indirect github.com/spf13/viper v1.11.0 github.com/stretchr/objx v0.3.0 // indirect github.com/stretchr/testify v1.7.1 diff --git a/go.sum b/go.sum index 897bdad..40df524 100644 --- a/go.sum +++ b/go.sum @@ -633,6 +633,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= @@ -832,6 +833,7 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -1417,6 +1419,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/slack-go/slack v0.10.3 h1:kKYwlKY73AfSrtAk9UHWCXXfitudkDztNI9GYBviLxw= +github.com/slack-go/slack v0.10.3/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= diff --git a/internal/alerting/alerter.go b/internal/alerting/alerter.go index e2138f8..4402792 100644 --- a/internal/alerting/alerter.go +++ b/internal/alerting/alerter.go @@ -1,14 +1,6 @@ package alerting import ( - "crypto/sha256" - "errors" - "fmt" - "os" - "runtime" - "time" - - "github.com/PagerDuty/go-pagerduty" "github.com/moov-io/achgateway/internal/service" ) @@ -22,85 +14,27 @@ func (mn *MockAlerter) AlertError(e error) error { return nil } -func NewAlerter(cfg service.ErrorAlerting) (Alerter, error) { +func NewAlerters(cfg service.ErrorAlerting) ([]Alerter, error) { + var alerters []Alerter switch { + case cfg.Slack != nil: + alerter, err := NewSlackAlerter(cfg.Slack) + if err != nil { + return nil, err + } + alerters = append(alerters, alerter) + fallthrough case cfg.PagerDuty != nil: - return NewPagerDutyAlerter(cfg.PagerDuty) - } - return &MockAlerter{}, nil -} - -type PagerDuty struct { - client *pagerduty.Client - routingKey string -} - -func NewPagerDutyAlerter(cfg *service.PagerDutyAlerting) (*PagerDuty, error) { - notifier := &PagerDuty{ - client: pagerduty.NewClient(cfg.ApiKey), - routingKey: cfg.RoutingKey, - } - if err := notifier.ping(); err != nil { - return nil, err + alerter, err := NewPagerDutyAlerter(cfg.PagerDuty) + if err != nil { + return nil, err + } + alerters = append(alerters, alerter) } - return notifier, nil -} - -func (pd *PagerDuty) AlertError(e error) error { - if e == nil { - return nil - } - - details := make(map[string]string) - hostName, err := os.Hostname() - if err != nil { - return fmt.Errorf("getting host name: %v", err) + if len(alerters) == 0 { + return []Alerter{&MockAlerter{}}, nil } - dedupKey := e.Error() - if _, file, line, ok := runtime.Caller(1); ok { - location := fmt.Sprintf("%s:%d", file, line) - details["location"] = location - dedupKey += location - } - - errorHash := fmt.Sprintf("%x", sha256.Sum256([]byte(dedupKey))) - - event := &pagerduty.V2Event{ - RoutingKey: pd.routingKey, - Action: "trigger", - DedupKey: errorHash, - Payload: &pagerduty.V2Payload{ - Summary: e.Error(), - Source: hostName, - Severity: "critical", - Timestamp: time.Now().Format(time.RFC3339), - Details: details, - }, - } - - _, err = pd.client.ManageEvent(event) - if err != nil { - return fmt.Errorf("creating event in PagerDuty: %v", err) - } - - return nil -} - -func (pd *PagerDuty) ping() error { - if pd == nil || pd.client == nil { - return errors.New("pagerduty: nil") - } - - // make a call and verify we don't error - resp, err := pd.client.ListAbilities() - if err != nil { - return fmt.Errorf("pagerduty list abilities: %v", err) - } - if len(resp.Abilities) <= 0 { - return fmt.Errorf("pagerduty: missing abilities") - } - - return nil + return alerters, nil } diff --git a/internal/alerting/alerter_test.go b/internal/alerting/alerter_test.go index e4c4b67..d66ac89 100644 --- a/internal/alerting/alerter_test.go +++ b/internal/alerting/alerter_test.go @@ -9,20 +9,59 @@ import ( "github.com/stretchr/testify/require" ) -func TestPagerDutyErrorAlert(t *testing.T) { - if os.Getenv("PD_API_KEY") == "" { - t.Skip("Skip PagerDuty notification as PD_API_KEY and PD_ROUTING_KEY are not set") +func TestNewAlerters(t *testing.T) { + if os.Getenv("PD_API_KEY") == "" && os.Getenv("SLACK_ACCESS_TOKEN") == "" { + t.Skip("Skip TestNewAlerters as PD_API_KEY and SLACK_ACCESS_TOKEN are not set") } + var cfg service.ErrorAlerting + var alerters []Alerter + var err error - cfg := &service.PagerDutyAlerting{ - ApiKey: os.Getenv("PD_API_KEY"), - RoutingKey: os.Getenv("PD_ROUTING_KEY"), + if os.Getenv("PD_API_KEY") != "" { + cfg = service.ErrorAlerting{ + PagerDuty: &service.PagerDutyAlerting{ + ApiKey: os.Getenv("PD_API_KEY"), + RoutingKey: os.Getenv("PD_ROUTING_KEY"), + }, + } + + alerters, err = NewAlerters(cfg) + require.NoError(t, err) + require.Len(t, alerters, 1) + } + + if os.Getenv("SLACK_ACCESS_TOKEN") != "" { + cfg = service.ErrorAlerting{ + Slack: &service.SlackAlerting{ + AccessToken: os.Getenv("SLACK_ACCESS_TOKEN"), + ChannelID: os.Getenv("SLACK_CHANNEL_ID"), + }, + } + + alerters, err = NewAlerters(cfg) + require.NoError(t, err) + require.Len(t, alerters, 1) } - notifier, err := NewPagerDutyAlerter(cfg) - require.NoError(t, err) - require.NotNil(t, notifier) + if os.Getenv("PD_API_KEY") != "" && os.Getenv("SLACK_ACCESS_TOKEN") != "" { + cfg = service.ErrorAlerting{ + PagerDuty: &service.PagerDutyAlerting{ + ApiKey: os.Getenv("PD_API_KEY"), + RoutingKey: os.Getenv("PD_ROUTING_KEY"), + }, + Slack: &service.SlackAlerting{ + AccessToken: os.Getenv("SLACK_ACCESS_TOKEN"), + ChannelID: os.Getenv("SLACK_CHANNEL_ID"), + }, + } + + alerters, err = NewAlerters(cfg) + require.NoError(t, err) + require.Len(t, alerters, 2) - err = notifier.AlertError(errors.New("error message")) - require.NoError(t, err) + for _, alerter := range alerters { + err = alerter.AlertError(errors.New("error message")) + require.NoError(t, err) + } + } } diff --git a/internal/alerting/pagerduty.go b/internal/alerting/pagerduty.go new file mode 100644 index 0000000..da1b20e --- /dev/null +++ b/internal/alerting/pagerduty.go @@ -0,0 +1,88 @@ +package alerting + +import ( + "crypto/sha256" + "errors" + "fmt" + "os" + "runtime" + "time" + + "github.com/PagerDuty/go-pagerduty" + "github.com/moov-io/achgateway/internal/service" +) + +type PagerDuty struct { + client *pagerduty.Client + routingKey string +} + +func NewPagerDutyAlerter(cfg *service.PagerDutyAlerting) (*PagerDuty, error) { + notifier := &PagerDuty{ + client: pagerduty.NewClient(cfg.ApiKey), + routingKey: cfg.RoutingKey, + } + if err := notifier.ping(); err != nil { + return nil, err + } + return notifier, nil +} + +func (pd *PagerDuty) AlertError(e error) error { + if e == nil { + return nil + } + + details := make(map[string]string) + + hostName, err := os.Hostname() + if err != nil { + return fmt.Errorf("getting host name: %v", err) + } + + dedupKey := e.Error() + if _, file, line, ok := runtime.Caller(1); ok { + location := fmt.Sprintf("%s:%d", file, line) + details["location"] = location + dedupKey += location + } + + errorHash := fmt.Sprintf("%x", sha256.Sum256([]byte(dedupKey))) + + event := &pagerduty.V2Event{ + RoutingKey: pd.routingKey, + Action: "trigger", + DedupKey: errorHash, + Payload: &pagerduty.V2Payload{ + Summary: e.Error(), + Source: hostName, + Severity: "critical", + Timestamp: time.Now().Format(time.RFC3339), + Details: details, + }, + } + + _, err = pd.client.ManageEvent(event) + if err != nil { + return fmt.Errorf("creating event in PagerDuty: %v", err) + } + + return nil +} + +func (pd *PagerDuty) ping() error { + if pd == nil || pd.client == nil { + return errors.New("pagerduty: nil") + } + + // make a call and verify we don't error + resp, err := pd.client.ListAbilities() + if err != nil { + return fmt.Errorf("pagerduty list abilities: %v", err) + } + if len(resp.Abilities) <= 0 { + return fmt.Errorf("pagerduty: missing abilities") + } + + return nil +} diff --git a/internal/alerting/pagerduty_test.go b/internal/alerting/pagerduty_test.go new file mode 100644 index 0000000..e4c4b67 --- /dev/null +++ b/internal/alerting/pagerduty_test.go @@ -0,0 +1,28 @@ +package alerting + +import ( + "errors" + "os" + "testing" + + "github.com/moov-io/achgateway/internal/service" + "github.com/stretchr/testify/require" +) + +func TestPagerDutyErrorAlert(t *testing.T) { + if os.Getenv("PD_API_KEY") == "" { + t.Skip("Skip PagerDuty notification as PD_API_KEY and PD_ROUTING_KEY are not set") + } + + cfg := &service.PagerDutyAlerting{ + ApiKey: os.Getenv("PD_API_KEY"), + RoutingKey: os.Getenv("PD_ROUTING_KEY"), + } + + notifier, err := NewPagerDutyAlerter(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + err = notifier.AlertError(errors.New("error message")) + require.NoError(t, err) +} diff --git a/internal/alerting/slack.go b/internal/alerting/slack.go new file mode 100644 index 0000000..390761f --- /dev/null +++ b/internal/alerting/slack.go @@ -0,0 +1,81 @@ +package alerting + +import ( + "errors" + "fmt" + + "github.com/moov-io/achgateway/internal/service" + "github.com/slack-go/slack" +) + +type Slack struct { + accessToken string + channelID string + client *slack.Client +} + +func NewSlackAlerter(cfg *service.SlackAlerting) (*Slack, error) { + notifier := &Slack{ + accessToken: cfg.AccessToken, + channelID: cfg.ChannelID, + client: slack.New(cfg.AccessToken), + } + if err := notifier.AuthTest(); err != nil { + return nil, err + } + return notifier, nil +} + +func (s *Slack) AlertError(e error) error { + if e == nil { + return nil + } + + _, _, err := s.client.PostMessage( + s.channelID, + slack.MsgOptionText(fmt.Sprintf("%v", e), false), + slack.MsgOptionAsUser(false), + ) + if err != nil { + return fmt.Errorf("sending slack message: %v", err) + } + + return nil +} + +func (s *Slack) AlertWithAttachments(msg, color string, fields []slack.AttachmentField) error { + var attachment = slack.Attachment{ + Fields: fields, + // color hex value, example: "#8E1600" + Color: color, + } + + _, _, err := s.client.PostMessage( + s.channelID, + slack.MsgOptionText(msg, false), + slack.MsgOptionAttachments(attachment), + slack.MsgOptionAsUser(false), + ) + if err != nil { + return fmt.Errorf("sending slack message: %v", err) + } + + return nil +} + +func (s *Slack) AuthTest() error { + if s == nil || s.client == nil { + return errors.New("slack: nil or no slack client") + } + + // make a call and verify we don't error + resp, err := s.client.AuthTest() + if err != nil { + return fmt.Errorf("slack auth test: %v", err) + } + if resp.UserID == "" { + return fmt.Errorf("slack: missing user_id") + } + + return nil +} diff --git a/internal/alerting/slack_test.go b/internal/alerting/slack_test.go new file mode 100644 index 0000000..2ffd533 --- /dev/null +++ b/internal/alerting/slack_test.go @@ -0,0 +1,28 @@ +package alerting + +import ( + "errors" + "os" + "testing" + + "github.com/moov-io/achgateway/internal/service" + "github.com/stretchr/testify/require" +) + +func TestSlackErrorAlert(t *testing.T) { + if os.Getenv("SLACK_ACCESS_TOKEN") == "" { + t.Skip("Skip Slack notification as SLACK_ACCESS_TOKEN and SLACK_CHANNEL_ID are not set") + } + + cfg := &service.SlackAlerting{ + AccessToken: os.Getenv("SLACK_ACCESS_TOKEN"), + ChannelID: os.Getenv("SLACK_CHANNEL_ID"), + } + + notifier, err := NewSlackAlerter(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + err = notifier.AlertError(errors.New("error message")) + require.NoError(t, err) +} diff --git a/internal/incoming/odfi/scheduler.go b/internal/incoming/odfi/scheduler.go index f198ae0..77f2dfa 100644 --- a/internal/incoming/odfi/scheduler.go +++ b/internal/incoming/odfi/scheduler.go @@ -52,7 +52,8 @@ type PeriodicScheduler struct { downloader Downloader processors Processors - alerter alerting.Alerter + errorAlerters []alerting.Alerter + warningAlerters []alerting.Alerter } func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul.Client, processors Processors) (Scheduler, error) { @@ -65,26 +66,31 @@ func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul return nil, err } - alerter, err := alerting.NewAlerter(cfg.Errors) + errorAlerters, err := alerting.NewAlerters(cfg.Errors) if err != nil { - return nil, fmt.Errorf("ERROR creating alerter: %v", err) + return nil, fmt.Errorf("ERROR creating error alerters: %v", err) + } + warningAlerters, err := alerting.NewAlerters(cfg.Warnings) + if err != nil { + return nil, fmt.Errorf("ERROR creating warning alerters: %v", err) } ctx, cancelFunc := context.WithCancel(context.Background()) return &PeriodicScheduler{ - logger: logger, - odfi: cfg.Inbound.ODFI, - sharding: cfg.Sharding, - uploadAgents: cfg.Upload, - ticker: time.NewTicker(cfg.Inbound.ODFI.Interval), - inboundTrigger: make(chan manuallyTriggeredInbound, 1), - consul: consul, - downloader: dl, - processors: processors, - shutdown: ctx, - shutdownFunc: cancelFunc, - alerter: alerter, + logger: logger, + odfi: cfg.Inbound.ODFI, + sharding: cfg.Sharding, + uploadAgents: cfg.Upload, + ticker: time.NewTicker(cfg.Inbound.ODFI.Interval), + inboundTrigger: make(chan manuallyTriggeredInbound, 1), + consul: consul, + downloader: dl, + processors: processors, + shutdown: ctx, + shutdownFunc: cancelFunc, + errorAlerters: errorAlerters, + warningAlerters: warningAlerters, }, nil } @@ -188,13 +194,16 @@ func (s *PeriodicScheduler) tick(shard *service.Shard) error { } func (s *PeriodicScheduler) alertOnError(err error) { - if s == nil || s.alerter == nil { + if s == nil || len(s.errorAlerters) == 0 { return } if err == nil { return } - if err := s.alerter.AlertError(err); err != nil { - s.logger.LogErrorf("ERROR sending alert: %v", err) + + for _, alerter := range s.errorAlerters { + if err := alerter.AlertError(err); err != nil { + s.logger.LogErrorf("ERROR sending alert: %v", err) + } } } diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index 3e11c76..f40e6aa 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -56,7 +56,8 @@ type aggregator struct { auditStorage audittrail.Storage preuploadTransformers []transform.PreUpload outputFormatter output.Formatter - alerter alerting.Alerter + errorAlerters []alerting.Alerter + warningAlerters []alerting.Alerter } func newAggregator( @@ -66,6 +67,7 @@ func newAggregator( shard service.Shard, uploadAgents service.UploadAgents, errorAlerting service.ErrorAlerting, + warningAlerting service.ErrorAlerting, ) (*aggregator, error) { merger, err := NewMerging(logger, consul, shard, uploadAgents) if err != nil { @@ -101,9 +103,13 @@ func newAggregator( return nil, fmt.Errorf("error creating cutoffs: %v", err) } - alerter, err := alerting.NewAlerter(errorAlerting) + errorAlerters, err := alerting.NewAlerters(errorAlerting) if err != nil { - return nil, fmt.Errorf("error setting up alerter: %v", err) + return nil, fmt.Errorf("error setting up error alerters: %v", err) + } + warningAlerters, err := alerting.NewAlerters(errorAlerting) + if err != nil { + return nil, fmt.Errorf("error setting up warning alerters: %v", err) } return &aggregator{ @@ -117,7 +123,8 @@ func newAggregator( auditStorage: auditStorage, preuploadTransformers: preuploadTransformers, outputFormatter: outputFormatter, - alerter: alerter, + errorAlerters: errorAlerters, + warningAlerters: warningAlerters, }, nil } @@ -274,7 +281,7 @@ func (xfagg *aggregator) uploadFile(index int, agent upload.Agent, res *transfor // Send Slack/PD or whatever notifications after the file is uploaded if err := xfagg.notifyAfterUpload(filename, res.File, agent, err); err != nil { - xfagg.alertOnError(xfagg.logger.LogError(err).Err()) + xfagg.alertOnWarning(xfagg.logger.LogError(err).Err()) } // record our upload metrics @@ -362,13 +369,31 @@ func (xfagg *aggregator) notifyAboutHoliday(day *schedule.Day) { } func (xfagg *aggregator) alertOnError(err error) { - if xfagg == nil || xfagg.alerter == nil { + if xfagg == nil || len(xfagg.errorAlerters) == 0 { + return + } + if err == nil { + return + } + + for _, alerter := range xfagg.errorAlerters { + if err := alerter.AlertError(err); err != nil { + xfagg.logger.LogErrorf("ERROR sending alert: %v", err) + } + } +} + +func (xfagg *aggregator) alertOnWarning(err error) { + if xfagg == nil || len(xfagg.warningAlerters) == 0 { return } if err == nil { return } - if err := xfagg.alerter.AlertError(err); err != nil { - xfagg.logger.LogErrorf("ERROR sending alert: %v", err) + + for _, alerter := range xfagg.warningAlerters { + if err := alerter.AlertError(err); err != nil { + xfagg.logger.LogErrorf("ERROR sending warning alert: %v", err) + } } } diff --git a/internal/pipeline/aggregate_test.go b/internal/pipeline/aggregate_test.go index 28db600..2902189 100644 --- a/internal/pipeline/aggregate_test.go +++ b/internal/pipeline/aggregate_test.go @@ -55,8 +55,9 @@ func TestAggregateACHFile(t *testing.T) { DefaultAgentID: "ftp-live", } var errorAlerting service.ErrorAlerting + var warningAlerting service.ErrorAlerting - xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting) + xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting, warningAlerting) require.NoError(t, err) merge := &MockXferMerging{} @@ -103,8 +104,9 @@ func TestAggregate_notifyAfterUpload(t *testing.T) { DefaultAgentID: "mock-agent", } var errorAlerting service.ErrorAlerting + var warningAlerting service.ErrorAlerting - xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting) + xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting, warningAlerting) require.NoError(t, err) require.NotPanics(t, func() { @@ -137,8 +139,9 @@ func TestAggregate_notifyAfterUploadErr(t *testing.T) { DefaultAgentID: "mock-agent", } var errorAlerting service.ErrorAlerting + var warningAlerting service.ErrorAlerting - xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting) + xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting, warningAlerting) require.NoError(t, err) require.NotPanics(t, func() { diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 574c1ff..fa7022a 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -47,7 +47,7 @@ func Start( // register each shard's aggregator shardAggregators := make(map[string]*aggregator) for i := range cfg.Sharding.Shards { - xfagg, err := newAggregator(logger, consul, eventEmitter, cfg.Sharding.Shards[i], cfg.Upload, cfg.Errors) + xfagg, err := newAggregator(logger, consul, eventEmitter, cfg.Sharding.Shards[i], cfg.Upload, cfg.Errors, cfg.Warnings) if err != nil { return nil, fmt.Errorf("problem starting shard=%s: %v", cfg.Sharding.Shards[i].Name, err) } diff --git a/internal/service/model_config.go b/internal/service/model_config.go index f92bfd8..dc45045 100644 --- a/internal/service/model_config.go +++ b/internal/service/model_config.go @@ -41,6 +41,7 @@ type Config struct { Events *EventsConfig Sharding Sharding Upload UploadAgents + Warnings ErrorAlerting Errors ErrorAlerting } @@ -60,6 +61,9 @@ func (cfg *Config) Validate() error { if err := cfg.Upload.Validate(); err != nil { return fmt.Errorf("upload: %v", err) } + if err := cfg.Warnings.Validate(); err != nil { + return fmt.Errorf("warnings: %v", err) + } if err := cfg.Errors.Validate(); err != nil { return fmt.Errorf("errors: %v", err) } diff --git a/internal/service/model_errors.go b/internal/service/model_errors.go index 04dc6e2..7f8e69f 100644 --- a/internal/service/model_errors.go +++ b/internal/service/model_errors.go @@ -24,6 +24,7 @@ import ( type ErrorAlerting struct { PagerDuty *PagerDutyAlerting + Slack *SlackAlerting } func (n ErrorAlerting) Validate() error { @@ -53,3 +54,20 @@ func (cfg PagerDutyAlerting) Validate() error { } return nil } + +type SlackAlerting struct { + // Oauth 2.0 access tokens are generated manually when creating a slack app + // https://api.slack.com/authentication/token-types + AccessToken string + + // A default channel can be specified when creating a Slack app, and this config + // can override or be used as the default + ChannelID string +} + +func (cfg SlackAlerting) Validate() error { + if cfg.AccessToken == "" { + return errors.New("slack error alerting: access token is missing") + } + return nil +}