Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Update async retry mechanisms (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Jul 7, 2020
1 parent f256e1e commit e2520eb
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 62 deletions.
34 changes: 29 additions & 5 deletions flyteadmin/pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package notifications

import (
"context"
"time"

"github.com/lyft/flyteadmin/pkg/async"

"github.com/lyft/flyteadmin/pkg/async/notifications/implementations"
"github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"
Expand Down Expand Up @@ -60,6 +63,8 @@ func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Sc
}

func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Processor {
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
var sub pubsub.Subscriber
var emailer interfaces.Emailer
switch config.Type {
Expand All @@ -73,11 +78,15 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
ConsumeBase64: &enable64decoding,
}
sqsConfig.Region = config.Region
process, err := gizmoAWS.NewSubscriber(sqsConfig)
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
sub, err = gizmoAWS.NewSubscriber(sqsConfig)
return err
})

if err != nil {
panic(err)
}
sub = process
emailer = GetEmailer(config, scope)
case common.Local:
fallthrough
Expand All @@ -90,6 +99,8 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
}

func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher {
reconnectAttempts := config.ReconnectAttempts
reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second
switch config.Type {
case common.AWS:
snsConfig := gizmoAWS.SNSConfig{
Expand All @@ -100,8 +111,15 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
} else {
snsConfig.Region = config.Region
}
publisher, err := gizmoAWS.NewPublisher(snsConfig)
// Any errors initiating Publisher with Amazon configurations results in a failed start up.

var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoAWS.NewPublisher(snsConfig)
return err
})

// Any persistent errors initiating Publisher with Amazon configurations results in a failed start up.
if err != nil {
panic(err)
}
Expand All @@ -111,7 +129,13 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
Topic: config.NotificationsPublisherConfig.TopicName,
}
pubsubConfig.ProjectID = config.GCPConfig.ProjectID
publisher, err := gizmoGCP.NewPublisher(context.TODO(), pubsubConfig)
var publisher pubsub.MultiPublisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = gizmoGCP.NewPublisher(context.TODO(), pubsubConfig)
return err
})

if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ func NewNoopPublish() interfaces.Publisher {

type NoopProcess struct{}

func (n *NoopProcess) StartProcessing() error {
func (n *NoopProcess) StartProcessing() {
logger.Debug(context.Background(), "call to noop start processing.")
return nil
}

func (n *NoopProcess) StopProcessing() error {
Expand Down
14 changes: 13 additions & 1 deletion flyteadmin/pkg/async/notifications/implementations/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package implementations

import (
"context"
"time"

"github.com/lyft/flyteadmin/pkg/async"

"github.com/lyft/flyteadmin/pkg/async/notifications/interfaces"

Expand Down Expand Up @@ -38,7 +41,16 @@ type Processor struct {
// Currently only email is the supported notification because slack and pagerduty both use
// email client to trigger those notifications.
// When Pagerduty and other notifications are supported, a publisher per type should be created.
func (p *Processor) StartProcessing() error {
func (p *Processor) StartProcessing() {
for {
logger.Warningf(context.Background(), "Starting notifications processor")
err := p.run()
logger.Errorf(context.Background(), "error with running processor err: [%v] ", err)
time.Sleep(async.RetryDelay)
}
}

func (p *Processor) run() error {
var emailMessage admin.EmailMessage
var err error
for msg := range p.sub.Start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func TestProcessor_StartProcessing(t *testing.T) {
mockEmailer.SetSendEmailFunc(sendEmailValidationFunc)
// TODO Add test for metric inc for number of messages processed.
// Assert 1 message processed and 1 total.
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
}

func TestProcessor_StartProcessingNoMessages(t *testing.T) {
initializeProcessor()
// Expect no errors are returned.
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
// TODO add test for metric inc() for number of messages processed.
// Assert 0 messages processed and 0 total.
}
Expand All @@ -59,7 +59,7 @@ func TestProcessor_StartProcessingNoNotificationMessage(t *testing.T) {
}
initializeProcessor()
testSubscriber.JSONMessages = append(testSubscriber.JSONMessages, testMessage)
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
// TODO add test for metric inc() for number of messages processed.
// Assert 1 messages error and 1 total.
}
Expand All @@ -72,7 +72,7 @@ func TestProcessor_StartProcessingMessageWrongDataType(t *testing.T) {
}
initializeProcessor()
testSubscriber.JSONMessages = append(testSubscriber.JSONMessages, testMessage)
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
// TODO add test for metric inc() for number of messages processed.
// Assert 1 messages error and 1 total.
}
Expand All @@ -85,7 +85,7 @@ func TestProcessor_StartProcessingBase64DecodeError(t *testing.T) {
}
initializeProcessor()
testSubscriber.JSONMessages = append(testSubscriber.JSONMessages, testMessage)
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
// TODO add test for metric inc() for number of messages processed.
// Assert 1 messages error and 1 total.
}
Expand All @@ -99,7 +99,7 @@ func TestProcessor_StartProcessingProtoMarshallError(t *testing.T) {
}
initializeProcessor()
testSubscriber.JSONMessages = append(testSubscriber.JSONMessages, testMessage)
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
// TODO add test for metric inc() for number of messages processed.
// Assert 1 messages error and 1 total.
}
Expand All @@ -110,7 +110,7 @@ func TestProcessor_StartProcessingError(t *testing.T) {
// The error set by GivenErrError is returned by Err().
// Err() is checked before Run() returning.
testSubscriber.GivenErrError = ret
assert.Equal(t, ret, testProcessor.StartProcessing())
assert.Equal(t, ret, testProcessor.(*Processor).run())
}

func TestProcessor_StartProcessingEmailError(t *testing.T) {
Expand All @@ -124,7 +124,7 @@ func TestProcessor_StartProcessingEmailError(t *testing.T) {

// Even if there is an error in sending an email StartProcessing will return no errors.
// TODO: Once stats have been added check for an email error stat.
assert.Nil(t, testProcessor.StartProcessing())
assert.Nil(t, testProcessor.(*Processor).run())
}

func TestProcessor_StopProcessing(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/async/notifications/interfaces/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Processor interface {
// If the channel closes gracefully, no error will be returned.
// If the underlying channel experiences errors,
// an error is returned and the channel is closed.
StartProcessing() error
StartProcessing()

// This should be invoked when the application is shutting down.
// If StartProcessing() returned an error, StopProcessing() will return an error because
Expand Down
33 changes: 30 additions & 3 deletions flyteadmin/pkg/async/schedule/aws/workflow_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"time"

"github.com/lyft/flyteadmin/pkg/async"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"

"github.com/lyft/flytestdlib/contextutils"

"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -42,6 +45,7 @@ type workflowExecutorMetrics struct {
MessageReceivedDelay labeled.StopWatch
ScheduledEventProcessingDelay labeled.StopWatch
CreateExecutionDuration labeled.StopWatch
ChannelClosedError prometheus.Counter
}

type workflowExecutor struct {
Expand Down Expand Up @@ -168,6 +172,15 @@ func (e *workflowExecutor) formulateExecutionCreateRequest(
}

func (e *workflowExecutor) Run() {
for {
logger.Warningf(context.Background(), "Starting workflow executor")
err := e.run()
logger.Errorf(context.Background(), "error with workflow executor err: [%v] ", err)
time.Sleep(async.RetryDelay)
}
}

func (e *workflowExecutor) run() error {
for message := range e.subscriber.Start() {
scheduledWorkflowExecutionRequest, err := DeserializeScheduleWorkflowPayload(message.Message())
ctx := context.Background()
Expand Down Expand Up @@ -243,7 +256,11 @@ func (e *workflowExecutor) Run() {
observedMessageTriggeredTime)
}
err := e.subscriber.Err()
logger.Errorf(context.TODO(), "Gizmo subscriber closed channel with err: [%+v]", err)
if err != nil {
logger.Errorf(context.TODO(), "Gizmo subscriber closed channel with err: [%+v]", err)
e.metrics.ChannelClosedError.Inc()
}
return err
}

func (e *workflowExecutor) Stop() error {
Expand Down Expand Up @@ -286,18 +303,28 @@ func newWorkflowExecutorMetrics(scope promutils.Scope) workflowExecutorMetrics {
CreateExecutionDuration: labeled.NewStopWatch("create_execution_duration",
"time spent waiting on the call to CreateExecution to return",
time.Second, scope, labeled.EmitUnlabeledMetric),
ChannelClosedError: scope.MustNewCounter("channel_closed_error", "count of channel closing errors"),
}
}

func NewWorkflowExecutor(
config aws.SQSConfig, executionManager interfaces.ExecutionInterface,
config aws.SQSConfig, schedulerConfig runtimeInterfaces.SchedulerConfig, executionManager interfaces.ExecutionInterface,
launchPlanManager interfaces.LaunchPlanInterface, scope promutils.Scope) scheduleInterfaces.WorkflowExecutor {

config.TimeoutSeconds = &timeout
// By default gizmo tries to base64 decode messages. Since we don't use the gizmo publisher interface to publish
// messages these are not encoded in base64 by default. Disable this behavior.
config.ConsumeBase64 = &doNotconsumeBase64
subscriber, err := aws.NewSubscriber(config)

maxReconnectAttempts := schedulerConfig.ReconnectAttempts
reconnectDelay := time.Duration(schedulerConfig.ReconnectDelaySeconds) * time.Second
var subscriber pubsub.Subscriber
var err error
err = async.Retry(maxReconnectAttempts, reconnectDelay, func() error {
subscriber, err = aws.NewSubscriber(config)
return err
})

if err != nil {
scope.MustNewCounter(
"initialize_executor_failed", "failures initializing scheduled workflow executor").Inc()
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/async/schedule/aws/workflow_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ func TestRun(t *testing.T) {
}, nil
})
testExecutor := newWorkflowExecutorForTest(&testSubscriber, &testExecutionManager, launchPlanManager)
testExecutor.Run()
err := testExecutor.run()
assert.Len(t, messages, messagesSeen)
assert.Nil(t, err)
}

func TestStop(t *testing.T) {
Expand Down
39 changes: 24 additions & 15 deletions flyteadmin/pkg/async/schedule/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package schedule

import (
"context"
"time"

"github.com/lyft/flyteadmin/pkg/async"

gizmoConfig "github.com/NYTimes/gizmo/pubsub/aws"
"github.com/aws/aws-sdk-go/aws"
Expand All @@ -17,10 +20,9 @@ import (
)

type WorkflowSchedulerConfig struct {
Retries int
EventSchedulerConfig runtimeInterfaces.EventSchedulerConfig
WorkflowExecutorConfig runtimeInterfaces.WorkflowExecutorConfig
Scope promutils.Scope
Retries int
SchedulerConfig runtimeInterfaces.SchedulerConfig
Scope promutils.Scope
}

type WorkflowScheduler interface {
Expand All @@ -44,12 +46,12 @@ func (w *workflowScheduler) GetWorkflowExecutor(
launchPlanManager managerInterfaces.LaunchPlanInterface) interfaces.WorkflowExecutor {
if w.workflowExecutor == nil {
sqsConfig := gizmoConfig.SQSConfig{
QueueName: w.cfg.WorkflowExecutorConfig.ScheduleQueueName,
QueueOwnerAccountID: w.cfg.WorkflowExecutorConfig.AccountID,
QueueName: w.cfg.SchedulerConfig.WorkflowExecutorConfig.ScheduleQueueName,
QueueOwnerAccountID: w.cfg.SchedulerConfig.WorkflowExecutorConfig.AccountID,
}
sqsConfig.Region = w.cfg.WorkflowExecutorConfig.Region
sqsConfig.Region = w.cfg.SchedulerConfig.WorkflowExecutorConfig.Region
w.workflowExecutor = awsSchedule.NewWorkflowExecutor(
sqsConfig, executionManager, launchPlanManager, w.cfg.Scope.NewSubScope("workflow_executor"))
sqsConfig, w.cfg.SchedulerConfig, executionManager, launchPlanManager, w.cfg.Scope.NewSubScope("workflow_executor"))
}
return w.workflowExecutor
}
Expand All @@ -58,26 +60,33 @@ func NewWorkflowScheduler(cfg WorkflowSchedulerConfig) WorkflowScheduler {
var eventScheduler interfaces.EventScheduler
var workflowExecutor interfaces.WorkflowExecutor

switch cfg.EventSchedulerConfig.Scheme {
switch cfg.SchedulerConfig.EventSchedulerConfig.Scheme {
case common.AWS:
awsConfig := aws.NewConfig().WithRegion(cfg.WorkflowExecutorConfig.Region).WithMaxRetries(cfg.Retries)
sess, err := session.NewSession(awsConfig)
awsConfig := aws.NewConfig().WithRegion(cfg.SchedulerConfig.WorkflowExecutorConfig.Region).WithMaxRetries(cfg.Retries)
var sess *session.Session
var err error
err = async.Retry(cfg.SchedulerConfig.ReconnectAttempts,
time.Duration(cfg.SchedulerConfig.ReconnectDelaySeconds)*time.Second, func() error {
sess, err = session.NewSession(awsConfig)
return err
})

if err != nil {
panic(err)
}
eventScheduler = awsSchedule.NewCloudWatchScheduler(
cfg.EventSchedulerConfig.ScheduleRole, cfg.EventSchedulerConfig.TargetName, sess, awsConfig,
cfg.SchedulerConfig.EventSchedulerConfig.ScheduleRole, cfg.SchedulerConfig.EventSchedulerConfig.TargetName, sess, awsConfig,
cfg.Scope.NewSubScope("cloudwatch_scheduler"))
case common.Local:
fallthrough
default:
logger.Infof(context.Background(),
"Using default noop event scheduler implementation for cloud provider type [%s]",
cfg.EventSchedulerConfig.Scheme)
cfg.SchedulerConfig.EventSchedulerConfig.Scheme)
eventScheduler = noop.NewNoopEventScheduler()
}

switch cfg.WorkflowExecutorConfig.Scheme {
switch cfg.SchedulerConfig.WorkflowExecutorConfig.Scheme {
case common.AWS:
// Do nothing, this special case depends on the execution manager and launch plan manager having been
// initialized and is handled in GetWorkflowExecutor.
Expand All @@ -87,7 +96,7 @@ func NewWorkflowScheduler(cfg WorkflowSchedulerConfig) WorkflowScheduler {
default:
logger.Infof(context.Background(),
"Using default noop workflow executor implementation for cloud provider type [%s]",
cfg.EventSchedulerConfig.Scheme)
cfg.SchedulerConfig.EventSchedulerConfig.Scheme)
workflowExecutor = noop.NewWorkflowExecutor()
}
return &workflowScheduler{
Expand Down
Loading

0 comments on commit e2520eb

Please sign in to comment.