Skip to content

Commit

Permalink
Implemented connection reuse for smtp emailer
Browse files Browse the repository at this point in the history
Signed-off-by: Ulbrich Robert <[email protected]>
  • Loading branch information
robert-ulbrich-mercedes-benz committed Sep 10, 2024
1 parent b265304 commit eda4d3e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 21 deletions.
53 changes: 34 additions & 19 deletions flyteadmin/pkg/async/notifications/implementations/smtp_emailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,82 @@ import (
"net/smtp"
"strings"

"golang.org/x/net/context"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)

type SMTPEmailer struct {
config *runtimeInterfaces.NotificationsEmailerConfig
systemMetrics emailMetrics
tlsConf *tls.Config
auth *smtp.Auth
smtpClient *smtp.Client
}

func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error {

func (s *SMTPEmailer) createClient(ctx context.Context) (*smtp.Client, error) {
newClient, err := smtp.Dial(s.config.EmailerConfig.SMTPServer + ":" + s.config.EmailerConfig.SMTPPort)

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err))
return nil, s.emailError(ctx, fmt.Sprintf("Error creating email client: %s", err))
}

defer newClient.Close()

if err = newClient.Hello("localhost"); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err))
return nil, s.emailError(ctx, fmt.Sprintf("Error initiating connection to SMTP server: %s", err))
}

if ok, _ := newClient.Extension("STARTTLS"); ok {
if err = newClient.StartTLS(s.tlsConf); err != nil {
return err
return nil, err
}
}

if ok, _ := newClient.Extension("AUTH"); ok {
if err = newClient.Auth(*s.auth); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err))
return nil, s.emailError(ctx, fmt.Sprintf("Error authenticating email client: %s", err))
}
}

if err = newClient.Mail(email.SenderEmail); err != nil {
return newClient, nil
}

func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) error {

if s.smtpClient == nil || s.smtpClient.Noop() != nil {

if s.smtpClient != nil {
err := s.smtpClient.Close()
if err != nil {
logger.Info(ctx, err)
}
}
smtpClient, err := s.createClient(ctx)

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating SMTP email client: %s", err))
}

s.smtpClient = smtpClient
}

if err := s.smtpClient.Mail(email.SenderEmail); err != nil {
return s.emailError(ctx, fmt.Sprintf("Error creating email instance: %s", err))
}

for _, recipient := range email.RecipientsEmail {
if err = newClient.Rcpt(recipient); err != nil {
if err := s.smtpClient.Rcpt(recipient); err != nil {
logger.Errorf(ctx, "Error adding email recipient: %s", err)
}
}

writer, err := newClient.Data()
writer, err := s.smtpClient.Data()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error adding email recipient: %s", err))
Expand All @@ -78,12 +99,6 @@ func (s *SMTPEmailer) SendEmail(ctx context.Context, email admin.EmailMessage) e
return s.emailError(ctx, fmt.Sprintf("Error closing mail body: %s", err))
}

err = newClient.Quit()

if err != nil {
return s.emailError(ctx, fmt.Sprintf("Error quitting mail agent: %s", err))
}

s.systemMetrics.SendSuccess.Inc()
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions flyteadmin/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"time"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"

"github.com/gorilla/handlers"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
Expand Down Expand Up @@ -45,6 +43,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/clients/go/assets"
grpcService "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/gateway/flyteidl/service"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/secretmanager"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand Down

0 comments on commit eda4d3e

Please sign in to comment.