diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index ba3192f..f2c2380 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -259,6 +259,9 @@ ACHGateway: PagerDuty: ApiKey: RoutingKey: + Slack: + AccessToken: + ChannelID: Mock: Enabled: ``` diff --git a/go.mod b/go.mod index 3aac7fa..787dcec 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/prometheus/client_golang v1.12.2 github.com/robfig/cron/v3 v3.0.1 github.com/sethvargo/go-retry v0.1.0 + github.com/slack-go/slack v0.10.3 // indirect github.com/spf13/viper v1.11.0 github.com/stretchr/objx v0.3.0 // indirect github.com/stretchr/testify v1.7.1 diff --git a/go.sum b/go.sum index 897bdad..40df524 100644 --- a/go.sum +++ b/go.sum @@ -633,6 +633,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= @@ -832,6 +833,7 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -1417,6 +1419,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/slack-go/slack v0.10.3 h1:kKYwlKY73AfSrtAk9UHWCXXfitudkDztNI9GYBviLxw= +github.com/slack-go/slack v0.10.3/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= diff --git a/internal/alerting/alerter.go b/internal/alerting/alerter.go index e2138f8..0db7cf8 100644 --- a/internal/alerting/alerter.go +++ b/internal/alerting/alerter.go @@ -1,17 +1,13 @@ package alerting import ( - "crypto/sha256" - "errors" "fmt" - "os" - "runtime" - "time" - "github.com/PagerDuty/go-pagerduty" "github.com/moov-io/achgateway/internal/service" ) +type Alerters []Alerter + type Alerter interface { AlertError(err error) error } @@ -22,84 +18,41 @@ func (mn *MockAlerter) AlertError(e error) error { return nil } -func NewAlerter(cfg service.ErrorAlerting) (Alerter, error) { +func NewAlerters(cfg service.ErrorAlerting) (Alerters, error) { + var alerters []Alerter switch { + case cfg.Slack != nil: + alerter, err := NewSlackAlerter(cfg.Slack) + if err != nil { + return nil, err + } + alerters = append(alerters, alerter) + fallthrough case cfg.PagerDuty != nil: - return NewPagerDutyAlerter(cfg.PagerDuty) + alerter, err := NewPagerDutyAlerter(cfg.PagerDuty) + if err != nil { + return nil, err + } + alerters = append(alerters, alerter) } - return &MockAlerter{}, nil -} - -type PagerDuty struct { - client *pagerduty.Client - routingKey string -} -func NewPagerDutyAlerter(cfg *service.PagerDutyAlerting) (*PagerDuty, error) { - notifier := &PagerDuty{ - client: pagerduty.NewClient(cfg.ApiKey), - routingKey: cfg.RoutingKey, - } - if err := notifier.ping(); err != nil { - return nil, err + if len(alerters) == 0 { + return []Alerter{&MockAlerter{}}, nil } - return notifier, nil + + return alerters, nil } -func (pd *PagerDuty) AlertError(e error) error { +func (s Alerters) AlertError(e error) error { if e == nil { return nil } - details := make(map[string]string) - - hostName, err := os.Hostname() - if err != nil { - return fmt.Errorf("getting host name: %v", err) - } - - dedupKey := e.Error() - if _, file, line, ok := runtime.Caller(1); ok { - location := fmt.Sprintf("%s:%d", file, line) - details["location"] = location - dedupKey += location - } - - errorHash := fmt.Sprintf("%x", sha256.Sum256([]byte(dedupKey))) - - event := &pagerduty.V2Event{ - RoutingKey: pd.routingKey, - Action: "trigger", - DedupKey: errorHash, - Payload: &pagerduty.V2Payload{ - Summary: e.Error(), - Source: hostName, - Severity: "critical", - Timestamp: time.Now().Format(time.RFC3339), - Details: details, - }, - } - - _, err = pd.client.ManageEvent(event) - if err != nil { - return fmt.Errorf("creating event in PagerDuty: %v", err) - } - - return nil -} - -func (pd *PagerDuty) ping() error { - if pd == nil || pd.client == nil { - return errors.New("pagerduty: nil") - } - - // make a call and verify we don't error - resp, err := pd.client.ListAbilities() - if err != nil { - return fmt.Errorf("pagerduty list abilities: %v", err) - } - if len(resp.Abilities) <= 0 { - return fmt.Errorf("pagerduty: missing abilities") + for _, alerter := range s { + err := alerter.AlertError(e) + if err != nil { + return fmt.Errorf("alerting error: %v", err) + } } return nil diff --git a/internal/alerting/alerter_test.go b/internal/alerting/alerter_test.go index e4c4b67..02aa427 100644 --- a/internal/alerting/alerter_test.go +++ b/internal/alerting/alerter_test.go @@ -9,20 +9,69 @@ import ( "github.com/stretchr/testify/require" ) -func TestPagerDutyErrorAlert(t *testing.T) { +func TestNewAlertersPagerDuty(t *testing.T) { if os.Getenv("PD_API_KEY") == "" { - t.Skip("Skip PagerDuty notification as PD_API_KEY and PD_ROUTING_KEY are not set") + t.Skip("Skip TestNewAlertersPagerDuty as PD_API_KEY is not set") } + var cfg service.ErrorAlerting + var alerters []Alerter + var err error - cfg := &service.PagerDutyAlerting{ - ApiKey: os.Getenv("PD_API_KEY"), - RoutingKey: os.Getenv("PD_ROUTING_KEY"), + cfg = service.ErrorAlerting{ + PagerDuty: &service.PagerDutyAlerting{ + ApiKey: os.Getenv("PD_API_KEY"), + RoutingKey: os.Getenv("PD_ROUTING_KEY"), + }, } - notifier, err := NewPagerDutyAlerter(cfg) + alerters, err = NewAlerters(cfg) require.NoError(t, err) - require.NotNil(t, notifier) + require.Len(t, alerters, 1) +} + +func TestNewAlertersSlack(t *testing.T) { + if os.Getenv("SLACK_ACCESS_TOKEN") == "" { + t.Skip("Skip TestNewAlertersSlack as SLACK_ACCESS_TOKEN is not set") + } + var cfg service.ErrorAlerting + var alerters []Alerter + var err error + + cfg = service.ErrorAlerting{ + Slack: &service.SlackAlerting{ + AccessToken: os.Getenv("SLACK_ACCESS_TOKEN"), + ChannelID: os.Getenv("SLACK_CHANNEL_ID"), + }, + } + + alerters, err = NewAlerters(cfg) + require.NoError(t, err) + require.Len(t, alerters, 1) +} + +func TestNewAlertersPagerDutyAndSlack(t *testing.T) { + if os.Getenv("PD_API_KEY") == "" && os.Getenv("SLACK_ACCESS_TOKEN") == "" { + t.Skip("Skip as PD_API_KEY and SLACK_ACCESS_TOKEN are not set") + } + var cfg service.ErrorAlerting + var alerters Alerters + var err error + + cfg = service.ErrorAlerting{ + PagerDuty: &service.PagerDutyAlerting{ + ApiKey: os.Getenv("PD_API_KEY"), + RoutingKey: os.Getenv("PD_ROUTING_KEY"), + }, + Slack: &service.SlackAlerting{ + AccessToken: os.Getenv("SLACK_ACCESS_TOKEN"), + ChannelID: os.Getenv("SLACK_CHANNEL_ID"), + }, + } + + alerters, err = NewAlerters(cfg) + require.NoError(t, err) + require.Len(t, alerters, 2) - err = notifier.AlertError(errors.New("error message")) + err = alerters.AlertError(errors.New("error message")) require.NoError(t, err) } diff --git a/internal/alerting/pagerduty.go b/internal/alerting/pagerduty.go new file mode 100644 index 0000000..da1b20e --- /dev/null +++ b/internal/alerting/pagerduty.go @@ -0,0 +1,88 @@ +package alerting + +import ( + "crypto/sha256" + "errors" + "fmt" + "os" + "runtime" + "time" + + "github.com/PagerDuty/go-pagerduty" + "github.com/moov-io/achgateway/internal/service" +) + +type PagerDuty struct { + client *pagerduty.Client + routingKey string +} + +func NewPagerDutyAlerter(cfg *service.PagerDutyAlerting) (*PagerDuty, error) { + notifier := &PagerDuty{ + client: pagerduty.NewClient(cfg.ApiKey), + routingKey: cfg.RoutingKey, + } + if err := notifier.ping(); err != nil { + return nil, err + } + return notifier, nil +} + +func (pd *PagerDuty) AlertError(e error) error { + if e == nil { + return nil + } + + details := make(map[string]string) + + hostName, err := os.Hostname() + if err != nil { + return fmt.Errorf("getting host name: %v", err) + } + + dedupKey := e.Error() + if _, file, line, ok := runtime.Caller(1); ok { + location := fmt.Sprintf("%s:%d", file, line) + details["location"] = location + dedupKey += location + } + + errorHash := fmt.Sprintf("%x", sha256.Sum256([]byte(dedupKey))) + + event := &pagerduty.V2Event{ + RoutingKey: pd.routingKey, + Action: "trigger", + DedupKey: errorHash, + Payload: &pagerduty.V2Payload{ + Summary: e.Error(), + Source: hostName, + Severity: "critical", + Timestamp: time.Now().Format(time.RFC3339), + Details: details, + }, + } + + _, err = pd.client.ManageEvent(event) + if err != nil { + return fmt.Errorf("creating event in PagerDuty: %v", err) + } + + return nil +} + +func (pd *PagerDuty) ping() error { + if pd == nil || pd.client == nil { + return errors.New("pagerduty: nil") + } + + // make a call and verify we don't error + resp, err := pd.client.ListAbilities() + if err != nil { + return fmt.Errorf("pagerduty list abilities: %v", err) + } + if len(resp.Abilities) <= 0 { + return fmt.Errorf("pagerduty: missing abilities") + } + + return nil +} diff --git a/internal/alerting/pagerduty_test.go b/internal/alerting/pagerduty_test.go new file mode 100644 index 0000000..e4c4b67 --- /dev/null +++ b/internal/alerting/pagerduty_test.go @@ -0,0 +1,28 @@ +package alerting + +import ( + "errors" + "os" + "testing" + + "github.com/moov-io/achgateway/internal/service" + "github.com/stretchr/testify/require" +) + +func TestPagerDutyErrorAlert(t *testing.T) { + if os.Getenv("PD_API_KEY") == "" { + t.Skip("Skip PagerDuty notification as PD_API_KEY and PD_ROUTING_KEY are not set") + } + + cfg := &service.PagerDutyAlerting{ + ApiKey: os.Getenv("PD_API_KEY"), + RoutingKey: os.Getenv("PD_ROUTING_KEY"), + } + + notifier, err := NewPagerDutyAlerter(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + err = notifier.AlertError(errors.New("error message")) + require.NoError(t, err) +} diff --git a/internal/alerting/slack.go b/internal/alerting/slack.go new file mode 100644 index 0000000..2733455 --- /dev/null +++ b/internal/alerting/slack.go @@ -0,0 +1,61 @@ +package alerting + +import ( + "errors" + "fmt" + + "github.com/moov-io/achgateway/internal/service" + "github.com/slack-go/slack" +) + +type Slack struct { + accessToken string + channelID string + client *slack.Client +} + +func NewSlackAlerter(cfg *service.SlackAlerting) (*Slack, error) { + notifier := &Slack{ + accessToken: cfg.AccessToken, + channelID: cfg.ChannelID, + client: slack.New(cfg.AccessToken), + } + if err := notifier.AuthTest(); err != nil { + return nil, err + } + return notifier, nil +} + +func (s *Slack) AlertError(e error) error { + if e == nil { + return nil + } + + _, _, err := s.client.PostMessage( + s.channelID, + slack.MsgOptionText(fmt.Sprintf("%v", e), false), + slack.MsgOptionAsUser(false), + ) + if err != nil { + return fmt.Errorf("sending slack message: %v", err) + } + + return nil +} + +func (s *Slack) AuthTest() error { + if s == nil || s.client == nil { + return errors.New("slack: nil or no slack client") + } + + // make a call and verify we don't error + resp, err := s.client.AuthTest() + if err != nil { + return fmt.Errorf("slack auth test: %v", err) + } + if resp.UserID == "" { + return fmt.Errorf("slack: missing user_id") + } + + return nil +} diff --git a/internal/alerting/slack_test.go b/internal/alerting/slack_test.go new file mode 100644 index 0000000..2ffd533 --- /dev/null +++ b/internal/alerting/slack_test.go @@ -0,0 +1,28 @@ +package alerting + +import ( + "errors" + "os" + "testing" + + "github.com/moov-io/achgateway/internal/service" + "github.com/stretchr/testify/require" +) + +func TestSlackErrorAlert(t *testing.T) { + if os.Getenv("SLACK_ACCESS_TOKEN") == "" { + t.Skip("Skip Slack notification as SLACK_ACCESS_TOKEN and SLACK_CHANNEL_ID are not set") + } + + cfg := &service.SlackAlerting{ + AccessToken: os.Getenv("SLACK_ACCESS_TOKEN"), + ChannelID: os.Getenv("SLACK_CHANNEL_ID"), + } + + notifier, err := NewSlackAlerter(cfg) + require.NoError(t, err) + require.NotNil(t, notifier) + + err = notifier.AlertError(errors.New("error message")) + require.NoError(t, err) +} diff --git a/internal/incoming/odfi/scheduler.go b/internal/incoming/odfi/scheduler.go index f198ae0..a89916e 100644 --- a/internal/incoming/odfi/scheduler.go +++ b/internal/incoming/odfi/scheduler.go @@ -52,7 +52,7 @@ type PeriodicScheduler struct { downloader Downloader processors Processors - alerter alerting.Alerter + alerters alerting.Alerters } func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul.Client, processors Processors) (Scheduler, error) { @@ -65,9 +65,9 @@ func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul return nil, err } - alerter, err := alerting.NewAlerter(cfg.Errors) + alerters, err := alerting.NewAlerters(cfg.Errors) if err != nil { - return nil, fmt.Errorf("ERROR creating alerter: %v", err) + return nil, fmt.Errorf("ERROR creating alerters: %v", err) } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -84,7 +84,7 @@ func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul processors: processors, shutdown: ctx, shutdownFunc: cancelFunc, - alerter: alerter, + alerters: alerters, }, nil } @@ -188,13 +188,14 @@ func (s *PeriodicScheduler) tick(shard *service.Shard) error { } func (s *PeriodicScheduler) alertOnError(err error) { - if s == nil || s.alerter == nil { + if s == nil { return } if err == nil { return } - if err := s.alerter.AlertError(err); err != nil { + + if err := s.alerters.AlertError(err); err != nil { s.logger.LogErrorf("ERROR sending alert: %v", err) } } diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index 3e11c76..99ca97d 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -56,7 +56,7 @@ type aggregator struct { auditStorage audittrail.Storage preuploadTransformers []transform.PreUpload outputFormatter output.Formatter - alerter alerting.Alerter + alerters alerting.Alerters } func newAggregator( @@ -101,9 +101,9 @@ func newAggregator( return nil, fmt.Errorf("error creating cutoffs: %v", err) } - alerter, err := alerting.NewAlerter(errorAlerting) + alerters, err := alerting.NewAlerters(errorAlerting) if err != nil { - return nil, fmt.Errorf("error setting up alerter: %v", err) + return nil, fmt.Errorf("error setting up alerters: %v", err) } return &aggregator{ @@ -117,7 +117,7 @@ func newAggregator( auditStorage: auditStorage, preuploadTransformers: preuploadTransformers, outputFormatter: outputFormatter, - alerter: alerter, + alerters: alerters, }, nil } @@ -362,13 +362,14 @@ func (xfagg *aggregator) notifyAboutHoliday(day *schedule.Day) { } func (xfagg *aggregator) alertOnError(err error) { - if xfagg == nil || xfagg.alerter == nil { + if xfagg == nil { return } if err == nil { return } - if err := xfagg.alerter.AlertError(err); err != nil { + + if err := xfagg.alerters.AlertError(err); err != nil { xfagg.logger.LogErrorf("ERROR sending alert: %v", err) } } diff --git a/internal/service/model_errors.go b/internal/service/model_errors.go index 04dc6e2..7f8e69f 100644 --- a/internal/service/model_errors.go +++ b/internal/service/model_errors.go @@ -24,6 +24,7 @@ import ( type ErrorAlerting struct { PagerDuty *PagerDutyAlerting + Slack *SlackAlerting } func (n ErrorAlerting) Validate() error { @@ -53,3 +54,20 @@ func (cfg PagerDutyAlerting) Validate() error { } return nil } + +type SlackAlerting struct { + // Oauth 2.0 access tokens are generated manually when creating a slack app + // https://api.slack.com/authentication/token-types + AccessToken string + + // A default channel can be specified when creating a Slack app, and this config + // can override or be used as the default + ChannelID string +} + +func (cfg SlackAlerting) Validate() error { + if cfg.AccessToken == "" { + return errors.New("slack error alerting: access token is missing") + } + return nil +}