-
Notifications
You must be signed in to change notification settings - Fork 63
Conversation
pkg/rpc/adminservice/base.go
Outdated
@@ -98,11 +98,18 @@ func NewAdminServer(kubeConfig, master string) *AdminService { | |||
publisher := notifications.NewNotificationsPublisher(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) | |||
processor := notifications.NewNotificationsProcessor(*configuration.ApplicationConfiguration().GetNotificationsConfig(), adminScope) | |||
go func() { | |||
logger.Info(context.Background(), "Started processing notifications.") | |||
err = processor.StartProcessing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt start processing auto-handle connection loss and reconnect failures? Ofcourse it should log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, refactored @kumare3
Codecov Report
@@ Coverage Diff @@
## master #106 +/- ##
==========================================
- Coverage 62.87% 62.60% -0.27%
==========================================
Files 101 102 +1
Lines 7369 7418 +49
==========================================
+ Hits 4633 4644 +11
- Misses 2191 2228 +37
- Partials 545 546 +1
Continue to review full report at Codecov.
|
Looks good to me, just one thing, in case an error connection, should FlyteAdmin process not crash/exit? |
I'm not sure - is a running admin with partial functionality better or should we fail fast? |
IMO, it could be a network partition or some other issue. If we fail fast, we might be able to recover on a different node? Also, since we have multiple replica's others should be fine. On the other hand in sandbox mode we should probably not crash as users may not be able to figure out what the problem is. I pinged Ruslan. I guess if we do it for this, we should probably do it for all other cases? |
Updated so that we retry initializing the initial AWS/GCP client the number of times specified in the config but otherwise indefinitely retry in the case an open channel hiccups. |
@@ -2,6 +2,9 @@ package notifications | |||
|
|||
import ( | |||
"context" | |||
"time" | |||
|
|||
"github.com/lyft/flyteadmin/pkg/async" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a future flytestdlib entity
pkg/async/notifications/factory.go
Outdated
@@ -37,11 +40,18 @@ type EmailerConfig struct { | |||
BaseURL string | |||
} | |||
|
|||
func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Emailer { | |||
func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope, | |||
reconnectAttempts int, reconnectDelay time.Duration) interfaces.Emailer { | |||
switch config.Type { | |||
case common.AWS: | |||
awsConfig := aws.NewConfig().WithRegion(config.Region).WithMaxRetries(maxRetries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems aws config already takes retries and delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
derp, thanks for the catch. updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thank you
for { | ||
logger.Warningf(context.Background(), "Starting notifications processor") | ||
err := p.run() | ||
logger.Errorf(context.Background(), "error with running processor err: [%v] ", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log a metric here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already log a metric when the channel closes
@@ -168,6 +171,15 @@ func (e *workflowExecutor) formulateExecutionCreateRequest( | |||
} | |||
|
|||
func (e *workflowExecutor) Run() { | |||
for { | |||
logger.Warningf(context.Background(), "Starting workflow executor") | |||
err := e.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and this is a layer above the async auto-retry layer right? so if we exhaust all retries, then we chill for half an hour and then try again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow half an hour is a goof, meant it to be half a minute - will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a metric on channel close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"async retry is for initializing clients a finite amount of times, your comment is when the channel hiccups"
// Number of times to attempt recreating a notifications processor client should there be any disruptions. | ||
ReconnectAttempts int `json:"reconnectAttempts"` | ||
// Specifies the time interval to wait before attempting to reconnect the notifications processor client. | ||
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be int? We can make it this if you want https://github.com/lyft/flytestdlib/blob/537f86093d86af270aab300742ee1a56f2905885/config/duration.go#L10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, but it's a question of how much granularity we need to expose - are people really going to configure retry delays on the order or many minutes or even hours?
TL;DR
Add retries to notifications processor to handle underlying SQS connection failures.
In general, add user-config specified retries for initializing the initial AWS/GCP client but otherwise indefinitely retry in the case an open channel hiccups.
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
flyteorg/flyte#376
Follow-up issue
NA