Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced SMTP notification #5535

Merged
merged 33 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b11f2cc
Introduced SMTP notification
robert-ulbrich-mercedes-benz Jul 1, 2024
3296a1a
Fixing lint issues
robert-ulbrich-mercedes-benz Jul 3, 2024
d6bc1a4
Fixing tests
robert-ulbrich-mercedes-benz Jul 4, 2024
a740c8e
Fixing lint
robert-ulbrich-mercedes-benz Jul 4, 2024
4be6a0e
Fixing lint
robert-ulbrich-mercedes-benz Jul 5, 2024
4353452
Fix linting issues
robert-ulbrich-mercedes-benz Jul 15, 2024
9aa1b3e
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Jul 29, 2024
3d74ea9
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Aug 16, 2024
cada9d8
Fix go mod
robert-ulbrich-mercedes-benz Aug 16, 2024
fccc16a
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Sep 5, 2024
73bb305
Fixing location of no sec comment
robert-ulbrich-mercedes-benz Sep 5, 2024
c7901bc
Removing unused import
robert-ulbrich-mercedes-benz Sep 5, 2024
b265304
Running gci for file
robert-ulbrich-mercedes-benz Sep 9, 2024
eda4d3e
Implemented connection reuse for smtp emailer
robert-ulbrich-mercedes-benz Sep 10, 2024
445b979
Introduced SMTP notification
robert-ulbrich-mercedes-benz Jul 1, 2024
a720425
Fixing lint issues
robert-ulbrich-mercedes-benz Jul 3, 2024
0bf9654
Fixing tests
robert-ulbrich-mercedes-benz Jul 4, 2024
f513352
Fixing lint
robert-ulbrich-mercedes-benz Jul 4, 2024
1a0de4a
Fixing lint
robert-ulbrich-mercedes-benz Jul 5, 2024
dc88a99
Fix linting issues
robert-ulbrich-mercedes-benz Jul 15, 2024
b54a932
Fix go mod
robert-ulbrich-mercedes-benz Aug 16, 2024
5eac9ea
Fixing location of no sec comment
robert-ulbrich-mercedes-benz Sep 5, 2024
0926604
Removing unused import
robert-ulbrich-mercedes-benz Sep 5, 2024
9d72fac
Running gci for file
robert-ulbrich-mercedes-benz Sep 9, 2024
d995db7
Implemented connection reuse for smtp emailer
robert-ulbrich-mercedes-benz Sep 10, 2024
f04c0fd
Merge remote-tracking branch 'origin/smtp-notifications' into smtp-no…
robert-ulbrich-mercedes-benz Sep 11, 2024
5da05e1
Adapting interface of smtp_emailer
robert-ulbrich-mercedes-benz Sep 11, 2024
1050f4b
Fixing linter issue
robert-ulbrich-mercedes-benz Sep 12, 2024
6d0e4bf
Adding unit tests
robert-ulbrich-mercedes-benz Sep 12, 2024
39369ba
Merge branch 'refs/heads/master' into smtp-notifications
robert-ulbrich-mercedes-benz Sep 12, 2024
8be8c46
Applying gci
robert-ulbrich-mercedes-benz Sep 13, 2024
64f3333
Faking mockery generation
robert-ulbrich-mercedes-benz Sep 16, 2024
50ce320
Using mocker 1.0.1
robert-ulbrich-mercedes-benz Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implemented connection reuse for smtp emailer
Signed-off-by: Ulbrich Robert <robert.ulbrich@mercedes-benz.com>
  • Loading branch information
robert-ulbrich-mercedes-benz committed Sep 11, 2024
commit d995db797796a5bab865ae1ccf073700798bd552
53 changes: 34 additions & 19 deletions flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,82 @@ import (
"net/smtp"
"strings"

"golang.org/x/net/context"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)

type SMTPEmailer struct {
config *runtimeInterfaces.NotificationsEmailerConfig
systemMetrics emailMetrics
tlsConf *tls.Config
auth *smtp.Auth
smtpClient *smtp.Client
}

func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error {

func (s *SMTPEmailer) createClient(ctx context.Context) (*smtp.Client, error) {
newClient, err := smtp.Dial(s.config.EmailerConfig.SMTPServer + ":" + s.config.EmailerConfig.SMTPPort)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, this is the intended behavior? If admin needs to send a thousand emails, it has to make a new client each time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intended behavior. When using the built-in email client of Go, most examples you find work that way. Also with the integration into the Notifications processor this is the simplest approach. Establishing a connection does not seem to be a very complex undertaking when looking at the source code of the email client.

I agree that keeping the connection is more efficient. But it also makes the code more complex, because the connection state needs to be maintained over an instance - so whenever an email is meant to be sent, it first needs to be checked if the connection is still alive.

If you want me to, I can refactor it to only open a connection once to the SMTP server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked again the internal implementation of Golang's smtp client: It appears that the client implementation seems to be not multi-thread-safe. Every client instance keeps an internal state between function calls - so when reusing the same client while sending an email for writing another mail can lead to weird behavior.

My assumption is that there is only a single email processor per Flyte Admin instance. That means that there should be no concurrent email processing on any Flyte Admin instance, which makes it safe to reuse an existing Flyste client.


if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err))
return nil, s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err))
}

defer newClient.Close()

if err = newClient.Hello("localhost"); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err))
return nil, s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err))
}

if ok, _ := newClient.Extension("STARTTLS"); ok {
if err = newClient.StartTLS(s.tlsConf); err != nil {
return err
return nil, err
}
}

if ok, _ := newClient.Extension("AUTH"); ok {
if err = newClient.Auth(*s.auth); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err))
return nil, s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err))
}
}

if err = newClient.Mail(email.SenderEmail); err != nil {
return newClient, nil
}

func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error {

if s.smtpClient == nil || s.smtpClient.Noop() != nil {

if s.smtpClient != nil {
err := s.smtpClient.Close()
if err != nil {
logger.Info(ctx, err)
}
}
smtpClient, err := s.createClient(ctx)

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating SMTP email client: %s", err))
}

s.smtpClient = smtpClient
}

if err := s.smtpClient.Mail(email.SenderEmail); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating email instance: %s", err))
}

for _, recipient := range email.RecipientsEmail {
if err = newClient.Rcpt(recipient); err != nil {
if err := s.smtpClient.Rcpt(recipient); err != nil {
logger.Errorf(ctx, "Error adding email recipient: %s", err)
}
}

writer, err := newClient.Data()
writer, err := s.smtpClient.Data()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error adding email recipient: %s", err))
Expand All @@ -78,12 +99,6 @@ func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) e
return s.emailError(ctx, fmt.Sprintf("Error closing mail body: %s", err))
}

err = newClient.Quit()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error quitting mail agent: %s", err))
}

s.systemMetrics.SendSuccess.Inc()
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions flyteadmin/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"time"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"

"github.com/gorilla/handlers"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
Expand Down Expand Up @@ -45,6 +43,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/clients/go/assets"
grpcService "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/gateway/flyteidl/service"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/secretmanager"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand Down