Skip to content

Commit

Permalink
Implement GCP notifications processor (flyteorg#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
ariefrahmansyah authored Sep 24, 2021
1 parent 6891d9f commit bca6228
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 62 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/pquerna/cachecontrol v0.0.0-20201205024021-ac21108117ac // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.19.0 // indirect
github.com/qor/qor v1.2.0 // indirect
github.com/qor/validations v0.0.0-20171228122639-f364bca61b46
Expand Down
18 changes: 17 additions & 1 deletion pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,30 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
panic(err)
}
emailer = GetEmailer(config, scope)
return implementations.NewProcessor(sub, emailer, scope)
case common.GCP:
projectID := config.GCPConfig.ProjectID
subscription := config.NotificationsProcessorConfig.QueueName
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
sub, err = gizmoGCP.NewSubscriber(context.TODO(), projectID, subscription)
if err != nil {
logger.Warnf(context.TODO(), "Failed to initialize new gizmo gcp subscriber with config [ProjectID: %s, Subscription: %s] and err: %v", projectID, subscription, err)
}
return err
})
if err != nil {
panic(err)
}
emailer = GetEmailer(config, scope)
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop notifications processor implementation for config type [%s]", config.Type)
return implementations.NewNoopProcess()
}
return implementations.NewProcessor(sub, emailer, scope)
}

func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,19 @@ package implementations

import (
"context"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"

"encoding/base64"
"encoding/json"
"time"

"github.com/NYTimes/gizmo/pubsub"
"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
)

type processorSystemMetrics struct {
Scope promutils.Scope
MessageTotal prometheus.Counter
MessageDoneError prometheus.Counter
MessageDecodingError prometheus.Counter
MessageDataError prometheus.Counter
MessageProcessorError prometheus.Counter
MessageSuccess prometheus.Counter
ChannelClosedError prometheus.Counter
StopError prometheus.Counter
}

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type Processor struct {
sub pubsub.Subscriber
Expand All @@ -54,11 +38,10 @@ func (p *Processor) run() error {
var emailMessage admin.EmailMessage
var err error
for msg := range p.sub.Start() {

p.systemMetrics.MessageTotal.Inc()
// Currently this is safe because Gizmo takes a string and casts it to a byte array.
var stringMsg = string(msg.Message())
// Amazon doesn't provide a struct that can be used to unmarshall into. A generic JSON struct is used in its place.
stringMsg := string(msg.Message())

var snsJSONFormat map[string]interface{}

// At Lyft, SNS populates SQS. This results in the message body of SQS having the SNS message format.
Expand Down Expand Up @@ -146,23 +129,6 @@ func (p *Processor) StopProcessing() error {
return err
}

func newProcessorSystemMetrics(scope promutils.Scope) processorSystemMetrics {
return processorSystemMetrics{
Scope: scope,
MessageTotal: scope.MustNewCounter("message_total", "overall count of messages processed"),
MessageDecodingError: scope.MustNewCounter("message_decoding_error", "count of messages with decoding errors"),
MessageDataError: scope.MustNewCounter("message_data_error", "count of message data processing errors experience when preparing the message to be notified."),
MessageDoneError: scope.MustNewCounter("message_done_error",
"count of message errors when marking it as done with underlying processor"),
MessageProcessorError: scope.MustNewCounter("message_processing_error",
"count of errors when interacting with notification processor"),
MessageSuccess: scope.MustNewCounter("message_ok",
"count of messages successfully processed by underlying notification mechanism"),
ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"),
StopError: scope.MustNewCounter("stop_error", "count of errors in Stop() method"),
}
}

func NewProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor {
return &Processor{
sub: sub,
Expand Down
91 changes: 91 additions & 0 deletions pkg/async/notifications/implementations/gcp_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package implementations

import (
"context"
"time"

"github.com/NYTimes/gizmo/pubsub"
"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/golang/protobuf/proto"
)

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type GcpProcessor struct {
sub pubsub.Subscriber
email interfaces.Emailer
systemMetrics processorSystemMetrics
}

func NewGcpProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor {
return &GcpProcessor{
sub: sub,
email: emailer,
systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("gcp_processor")),
}
}

func (p *GcpProcessor) StartProcessing() {
for {
logger.Warningf(context.Background(), "Starting GCP notifications processor")
err := p.run()
logger.Errorf(context.Background(), "error with running GCP processor err: [%v] ", err)
time.Sleep(async.RetryDelay)
}
}

func (p *GcpProcessor) run() error {
var emailMessage admin.EmailMessage

for msg := range p.sub.Start() {
p.systemMetrics.MessageTotal.Inc()

if err := proto.Unmarshal(msg.Message(), &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object message [%s] with err: %v", string(msg.Message()), err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
continue
}

if err := p.email.SendEmail(context.Background(), emailMessage); err != nil {
p.systemMetrics.MessageProcessorError.Inc()
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
} else {
p.systemMetrics.MessageSuccess.Inc()
}

p.markMessageDone(msg)
}

// According to https://github.com/NYTimes/gizmo/blob/f2b3deec03175b11cdfb6642245a49722751357f/pubsub/pubsub.go#L36-L39,
// the channel backing the subscriber will just close if there is an error. The call to Err() is needed to identify
// there was an error in the channel or there are no more messages left (resulting in no errors when calling Err()).
if err := p.sub.Err(); err != nil {
p.systemMetrics.ChannelClosedError.Inc()
logger.Warningf(context.Background(), "The stream for the subscriber channel closed with err: %v", err)
return err
}

return nil
}

func (p *GcpProcessor) markMessageDone(message pubsub.SubscriberMessage) {
if err := message.Done(); err != nil {
p.systemMetrics.MessageDoneError.Inc()
logger.Errorf(context.Background(), "failed to mark message as Done() in processor with err: %v", err)
}
}

func (p *GcpProcessor) StopProcessing() error {
// Note: If the underlying channel is already closed, then Stop() will return an error.
if err := p.sub.Stop(); err != nil {
p.systemMetrics.StopError.Inc()
logger.Errorf(context.Background(), "Failed to stop the subscriber channel gracefully with err: %v", err)
return err
}

return nil
}
113 changes: 113 additions & 0 deletions pkg/async/notifications/implementations/gcp_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package implementations

import (
"context"
"testing"

"github.com/NYTimes/gizmo/pubsub/pubsubtest"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
)

var (
testGcpSubscriber pubsubtest.TestSubscriber
mockGcpEmailer mocks.MockEmailer
)

// This method should be invoked before every test to Subscriber.
func initializeGcpSubscriber() {
testGcpSubscriber.GivenStopError = nil
testGcpSubscriber.GivenErrError = nil
testGcpSubscriber.FoundError = nil
testGcpSubscriber.ProtoMessages = nil
testGcpSubscriber.JSONMessages = nil
}

func TestGcpProcessor_StartProcessing(t *testing.T) {
initializeGcpSubscriber()
testGcpSubscriber.ProtoMessages = append(testGcpSubscriber.ProtoMessages, testSubscriberProtoMessages...)

testGcpProcessor := NewGcpProcessor(&testGcpSubscriber, &mockGcpEmailer, promutils.NewTestScope())

sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error {
assert.Equal(t, email.Body, testEmail.Body)
assert.Equal(t, email.RecipientsEmail, testEmail.RecipientsEmail)
assert.Equal(t, email.SubjectLine, testEmail.SubjectLine)
assert.Equal(t, email.SenderEmail, testEmail.SenderEmail)
return nil
}
mockGcpEmailer.SetSendEmailFunc(sendEmailValidationFunc)
assert.Nil(t, testGcpProcessor.(*GcpProcessor).run())

// Check fornumber of messages processed.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageSuccess.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:<value:1 > ", m.String())
}

func TestGcpProcessor_StartProcessingNoMessages(t *testing.T) {
initializeGcpSubscriber()

testGcpProcessor := NewGcpProcessor(&testGcpSubscriber, &mockGcpEmailer, promutils.NewTestScope())

// Expect no errors are returned.
assert.Nil(t, testGcpProcessor.(*GcpProcessor).run())

// Check fornumber of messages processed.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageSuccess.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:<value:0 > ", m.String())
}

func TestGcpProcessor_StartProcessingError(t *testing.T) {
initializeGcpSubscriber()

ret := errors.New("err() returned an error")
// The error set by GivenErrError is returned by Err().
// Err() is checked before Run() returning.
testGcpSubscriber.GivenErrError = ret

testGcpProcessor := NewGcpProcessor(&testGcpSubscriber, &mockGcpEmailer, promutils.NewTestScope())
assert.Equal(t, ret, testGcpProcessor.(*GcpProcessor).run())
}

func TestGcpProcessor_StartProcessingEmailError(t *testing.T) {
initializeGcpSubscriber()
emailError := errors.New("error sending email")
sendEmailErrorFunc := func(ctx context.Context, email admin.EmailMessage) error {
return emailError
}
mockGcpEmailer.SetSendEmailFunc(sendEmailErrorFunc)
testGcpSubscriber.ProtoMessages = append(testGcpSubscriber.ProtoMessages, testSubscriberProtoMessages...)

testGcpProcessor := NewGcpProcessor(&testGcpSubscriber, &mockGcpEmailer, promutils.NewTestScope())

// Even if there is an error in sending an email StartProcessing will return no errors.
assert.Nil(t, testGcpProcessor.(*GcpProcessor).run())

// Check for an email error stat.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageProcessorError.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:<value:1 > ", m.String())
}

func TestGcpProcessor_StopProcessing(t *testing.T) {
initializeGcpSubscriber()
testGcpProcessor := NewGcpProcessor(&testGcpSubscriber, &mockGcpEmailer, promutils.NewTestScope())
assert.Nil(t, testGcpProcessor.StopProcessing())
}

func TestGcpProcessor_StopProcessingError(t *testing.T) {
initializeGcpSubscriber()
stopError := errors.New("stop() returns an error")
testGcpSubscriber.GivenStopError = stopError
testGcpProcessor := NewGcpProcessor(&testGcpSubscriber, &mockGcpEmailer, promutils.NewTestScope())
assert.Equal(t, stopError, testGcpProcessor.StopProcessing())
}
32 changes: 32 additions & 0 deletions pkg/async/notifications/implementations/processor_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package implementations

import (
"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

type processorSystemMetrics struct {
Scope promutils.Scope
MessageTotal prometheus.Counter
MessageDoneError prometheus.Counter
MessageDecodingError prometheus.Counter
MessageDataError prometheus.Counter
MessageProcessorError prometheus.Counter
MessageSuccess prometheus.Counter
ChannelClosedError prometheus.Counter
StopError prometheus.Counter
}

func newProcessorSystemMetrics(scope promutils.Scope) processorSystemMetrics {
return processorSystemMetrics{
Scope: scope,
MessageTotal: scope.MustNewCounter("message_total", "overall count of messages processed"),
MessageDecodingError: scope.MustNewCounter("message_decoding_error", "count of messages with decoding errors"),
MessageDataError: scope.MustNewCounter("message_data_error", "count of message data processing errors experience when preparing the message to be notified."),
MessageDoneError: scope.MustNewCounter("message_done_error", "count of message errors when marking it as done with underlying processor"),
MessageProcessorError: scope.MustNewCounter("message_processing_error", "count of errors when interacting with notification processor"),
MessageSuccess: scope.MustNewCounter("message_ok", "count of messages successfully processed by underlying notification mechanism"),
ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"),
StopError: scope.MustNewCounter("stop_error", "count of errors in Stop() method"),
}
}
Loading

0 comments on commit bca6228

Please sign in to comment.