Skip to content

Commit

Permalink
feat: mailman_queue for mailman
Browse files Browse the repository at this point in the history
  • Loading branch information
ItsNotGoodName committed Sep 12, 2023
1 parent 615f058 commit 038bf27
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 37 deletions.
9 changes: 9 additions & 0 deletions cmd/smtpbridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ItsNotGoodName/smtpbridge/internal/database"
"github.com/ItsNotGoodName/smtpbridge/internal/file"
"github.com/ItsNotGoodName/smtpbridge/internal/mailman"
"github.com/ItsNotGoodName/smtpbridge/internal/models"
"github.com/ItsNotGoodName/smtpbridge/internal/repo"
"github.com/ItsNotGoodName/smtpbridge/migrations"
"github.com/ItsNotGoodName/smtpbridge/pkg/secret"
Expand Down Expand Up @@ -96,6 +97,14 @@ func run(flags *flag.FlagSet) lieut.Executor {
webFileStore := app.NewWebFileStore("apple-touch-icon.png", fmt.Sprintf("http://127.0.0.1:%d", cfg.HTTPPort))
app := app.New(db, fileStore, bus, cfg.Config, cfg.EndpointFactory, webFileStore)

// TODO: move this somewhere else
{
release := bus.OnEnvelopeCreated(func(ctx context.Context, evt models.EventEnvelopeCreated) error {
return app.MailmanEnqueue(ctx, evt.ID)
})
defer release()
}

// Supervisor
super := suture.New("root", suture.Spec{
EventHook: sutureext.EventHook(),
Expand Down
32 changes: 32 additions & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"errors"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -359,4 +360,35 @@ func (a App) RetentionPolicyRun(ctx context.Context, tracer trace.Tracer) error
return nil
}

func (a App) MailmanDequeue(ctx context.Context) (*models.Envelope, error) {
envelopeID, err := repo.MailmanDequeue(ctx, a.db)
if err != nil {
if errors.Is(err, repo.ErrNoRows) {
return nil, nil
}
return nil, err
}

env, err := repo.EnvelopeGet(ctx, a.db, envelopeID)
if err != nil {
if errors.Is(err, repo.ErrNoRows) {
return nil, nil
}
return nil, err
}

return &env, nil
}

func (a App) MailmanEnqueue(ctx context.Context, envelopeID int64) error {
err := repo.MailmanEnqueue(ctx, a.db, envelopeID)
if err != nil {
return err
}

a.bus.MailmanEnqueued(ctx)

return nil
}

var _ core.App = App{}
32 changes: 30 additions & 2 deletions internal/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ import (
"github.com/ItsNotGoodName/smtpbridge/internal/models"
"github.com/google/uuid"
"github.com/mustafaturan/bus/v3"
"github.com/rs/zerolog/log"
)

func logEmitErr(err error) {
if err != nil {
log.Err(err).Msg("Failed to emit bus event")
}
}

type generator struct{}

func (generator) Generate() string {
Expand All @@ -28,6 +35,7 @@ func New() (Bus, error) {
bus.RegisterTopics(
TopicEnvelopeCreated,
TopicEnvelopeDeleted,
TopicMailmanEnqueued,
)

return Bus{
Expand All @@ -38,11 +46,12 @@ func New() (Bus, error) {
const (
TopicEnvelopeCreated = "envelope.created"
TopicEnvelopeDeleted = "envelope.deleted"
TopicMailmanEnqueued = "mailman.enqueued"
)

// EnvelopeCreated implements core.Bus.
func (b Bus) EnvelopeCreated(ctx context.Context, id int64) {
b.bus.Emit(ctx, TopicEnvelopeCreated, id)
logEmitErr(b.bus.Emit(ctx, TopicEnvelopeCreated, id))
}

// OnEnvelopeCreated implements core.Bus.
Expand All @@ -64,7 +73,7 @@ func (b Bus) OnEnvelopeCreated(h func(ctx context.Context, evt models.EventEnvel

// EnvelopeDeleted implements core.Bus.
func (b Bus) EnvelopeDeleted(ctx context.Context) {
b.bus.Emit(ctx, TopicEnvelopeDeleted, nil)
logEmitErr(b.bus.Emit(ctx, TopicEnvelopeDeleted, nil))
}

// OnEnvelopeDeleted implements core.Bus.
Expand All @@ -81,4 +90,23 @@ func (b Bus) OnEnvelopeDeleted(h func(ctx context.Context, evt models.EventEnvel
return func() { b.bus.DeregisterHandler(key) }
}

// MailmanEnqueued implements core.Bus.
func (b Bus) MailmanEnqueued(ctx context.Context) {
logEmitErr(b.bus.Emit(ctx, TopicMailmanEnqueued, nil))
}

// OnMailmanEnqueued implements core.Bus.
func (b Bus) OnMailmanEnqueued(h func(ctx context.Context, evt models.EventMailmanEnqueued) error) func() {
key := uuid.NewString()

b.bus.RegisterHandler(key, bus.Handler{
Handle: func(ctx context.Context, e bus.Event) {
h(ctx, models.EventMailmanEnqueued{})
},
Matcher: TopicMailmanEnqueued,
})

return func() { b.bus.DeregisterHandler(key) }
}

var _ core.Bus = Bus{}
4 changes: 4 additions & 0 deletions internal/core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type Bus interface {
OnEnvelopeCreated(func(ctx context.Context, evt models.EventEnvelopeCreated) error) func()
EnvelopeDeleted(ctx context.Context)
OnEnvelopeDeleted(func(ctx context.Context, evt models.EventEnvelopeDeleted) error) func()
MailmanEnqueued(ctx context.Context)
OnMailmanEnqueued(func(ctx context.Context, evt models.EventMailmanEnqueued) error) func()
}

type App interface {
Expand Down Expand Up @@ -46,4 +48,6 @@ type App interface {
TraceDrop(ctx context.Context) error
TraceList(ctx context.Context, page pagination.Page, req models.DTOTraceListRequest) (models.DTOTraceListResult, error)
Tracer(source string) trace.Tracer
MailmanEnqueue(ctx context.Context, envelopeID int64) error
MailmanDequeue(ctx context.Context) (*models.Envelope, error)
}
78 changes: 78 additions & 0 deletions internal/jet/table/mailman_queue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/jet/table/table_use_schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 31 additions & 32 deletions internal/mailman/mailman.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mailman

import (
"context"
"fmt"
"time"

"github.com/ItsNotGoodName/smtpbridge/internal/core"
Expand All @@ -18,7 +17,6 @@ type Mailman struct {
app core.App
fileStore endpoint.FileStore
endpointFactory endpoint.Factory
queueLimit int
}

func New(app core.App, bus core.Bus, fileStore endpoint.FileStore, endpointFactory endpoint.Factory) Mailman {
Expand All @@ -27,20 +25,15 @@ func New(app core.App, bus core.Bus, fileStore endpoint.FileStore, endpointFacto
bus: bus,
fileStore: fileStore,
endpointFactory: endpointFactory,
queueLimit: 100,
}
}

func (m Mailman) Serve(ctx context.Context) error {
idC := make(chan int64, m.queueLimit)
release := m.bus.OnEnvelopeCreated(func(ctx context.Context, evt models.EventEnvelopeCreated) error {
checkC := make(chan struct{}, 1)
release := m.bus.OnMailmanEnqueued(func(ctx context.Context, evt models.EventMailmanEnqueued) error {
select {
case idC <- evt.ID:
case checkC <- struct{}{}:
default:
m.app.Tracer(trace.SourceMailman).Trace(ctx,
"mailman.overflow",
trace.WithEnvelope(evt.ID),
trace.WithError(fmt.Errorf("mailman is full")))
}

return nil
Expand All @@ -51,36 +44,42 @@ func (m Mailman) Serve(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case id := <-idC:
tracer := m.app.Tracer(trace.SourceMailman).
Sticky(trace.WithEnvelope(id))

tracer.Trace(ctx, "mailman.start")
err := m.send(ctx, tracer, id)
if err != nil {
tracer.Trace(ctx, "mailman.error", trace.WithError(err))
log.Err(err).Int64("envelope-id", id).Msg("Failed to send envelope")
case <-checkC:
for {
tracer := m.app.Tracer(trace.SourceMailman)

maybeEnv, err := m.app.MailmanDequeue(ctx)
if err != nil {
tracer.Trace(ctx, "mailman.dequeue", trace.WithError(err))
break
}
if maybeEnv == nil {
break
}
env := *maybeEnv

tracer = tracer.Sticky(trace.WithEnvelope(env.Message.ID))

tracer.Trace(ctx, "mailman.start")
if err := m.send(ctx, tracer, env); err != nil {
tracer.Trace(ctx, "mailman.error", trace.WithError(err))
log.Err(err).Int64("envelope-id", env.Message.ID).Msg("Failed to send envelope")
}
tracer.Trace(ctx, "mailman.end")
}
tracer.Trace(ctx, "mailman.end")
}
}
}

func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64) error {
// Get envelope
env, err := m.app.EnvelopeGet(ctx, envelopeID)
if err != nil {
return err
}

func (m Mailman) send(ctx context.Context, tracer trace.Tracer, env models.Envelope) error {
// List all rules
rules, err := m.app.RuleEndpointsList(ctx)
if err != nil {
return err
}

if len(rules) == 0 {
tracer.Trace(ctx, "mailman.rules.skip.empty")
tracer.Trace(ctx, "mailman.rules.skip(empty)")
return nil
}

Expand All @@ -89,7 +88,7 @@ func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64
tracer := tracer.Sticky(trace.WithRule(r.Rule.ID))

if len(r.Endpoints) == 0 {
tracer.Trace(ctx, "mailman.rule.endpoints.skip.empty")
tracer.Trace(ctx, "mailman.rule.endpoints.skip(empty)")
continue
}

Expand All @@ -111,14 +110,14 @@ func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64
continue
}

tracer.Trace(ctx, "mailman.rule.match.pass")
tracer.Trace(ctx, "mailman.rule.match.success")

for _, e := range r.Endpoints {
tracer := tracer.Sticky(trace.WithEndpoint(e.ID))

// Prevent duplicate envelopes
if _, ok := sent[e.ID]; ok {
tracer.Trace(ctx, "mailman.rule.endpoint.skip.duplicate")
tracer.Trace(ctx, "mailman.rule.endpoint.skip(duplicate)")
continue
}
sent[e.ID] = struct{}{}
Expand All @@ -137,7 +136,7 @@ func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64
if err != nil {
tracer.Trace(ctx, "mailman.rule.endpoint.send.error", trace.WithError(err), trace.WithDuration(time.Now().Sub(start)))
} else {
tracer.Trace(ctx, "mailman.rule.endpoint.send", trace.WithDuration(time.Now().Sub(start)))
tracer.Trace(ctx, "mailman.rule.endpoint.send.success", trace.WithDuration(time.Now().Sub(start)))
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/models/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ type EventEnvelopeCreated struct {

type EventEnvelopeDeleted struct {
}

type EventMailmanEnqueued struct {
}
Loading

0 comments on commit 038bf27

Please sign in to comment.