From 4ce3720f86cdaea68dc286a4903680cf125f099c Mon Sep 17 00:00:00 2001 From: ItsNotGoodName Date: Mon, 3 Jan 2022 17:18:24 -0800 Subject: [PATCH] feat: auto cleanup database based on size --- README.md | 3 +- cmd/server.go | 19 +++++++--- config/config.go | 2 ++ core/attachment.go | 2 ++ core/background.go | 7 ++++ core/database.go | 6 ++++ core/message.go | 4 ++- left/web/embed_dev.go | 18 +++++----- right/repository/attachment.go | 21 +++++++++++ right/repository/message.go | 9 +++-- right/repository/mock_attachment.go | 4 +++ right/repository/mock_message.go | 2 +- service/message.go | 56 ++++++++++++++++++++++++++++- 13 files changed, 134 insertions(+), 19 deletions(-) create mode 100644 core/background.go diff --git a/README.md b/README.md index 5a892a40..e408c5a6 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ database: # Database to store past messages type: bolt # Needs to be 'bolt' to be enabled, Default '' db: /tmp/smtpbridge.db # Default '$HOME/.smtpbridge/smtpbridge.db' attachments: /tmp/attachments # Default '$HOME/.smtpbridge/attachments' + size: 2147483648 # Max total size of attachments in bytes, Default 2147483648 (2 GiB) http: # HTTP server that shows past messages enable: true # Enable http server, default 'false' @@ -51,7 +52,7 @@ http: # HTTP server that shows past messages smtp: # SMTP server that receives emails host: "127.0.0.1" # Host to listen on, default '' port: 1025 # Port to listen on, default 1025 - size: 26214400 # Max allowed size of email in bytes, default 26214400 (25 MB) + size: 26214400 # Max allowed size of email in bytes, default 26214400 (25 MiB) auth: true # Enable auth, default 'false' username: user # Default '' password: 12345678 # Default '' diff --git a/cmd/server.go b/cmd/server.go index 00f2bd06..8613a8d8 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -22,6 +22,7 @@ THE SOFTWARE. package cmd import ( + "context" "log" "os" "os/signal" @@ -66,7 +67,7 @@ var serverCmd = &cobra.Command{ // Init service authSVC := service.NewAuth(serverConfig) bridgeSVC := service.NewBridge(serverConfig, endpointREPO) - messageSVC := service.NewMessage(attachmentREPO, messageREPO) + messageSVC := service.NewMessage(serverConfig, attachmentREPO, messageREPO) endpointSVC := service.NewEndpoint(endpointREPO, messageSVC) // Init app @@ -92,10 +93,17 @@ var serverCmd = &cobra.Command{ smtpServer := smtp.New(serverConfig, smtpBackend) go smtpServer.Start() + // Start background message service + done := make(chan struct{}) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go messageSVC.Run(ctx, done) + // Wait for interrupt - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt) - <-stop + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + cancel() // Close database err := db.Close() @@ -103,6 +111,9 @@ var serverCmd = &cobra.Command{ log.Println("error closing database:", err) } + // Wait for background message service + <-done + log.Println("server stopped") }, } diff --git a/config/config.go b/config/config.go index 8c43b097..b0beb739 100644 --- a/config/config.go +++ b/config/config.go @@ -36,6 +36,7 @@ type ConfigDB struct { Type string `json:"type" mapstructure:"type"` DB string `json:"db" mapstructure:"db"` Attachments string `json:"attachments" mapstructure:"attachments"` + Size int64 `json:"size" mapstructure:"size"` } func (db *ConfigDB) IsBolt() bool { @@ -78,6 +79,7 @@ func New() *Config { Type: "bolt", DB: path.Join(rootPath, "smtpbridge.db"), Attachments: path.Join(rootPath, "attachments"), + Size: 1024 * 1024 * 2048, }, SMTP: ConfigSMTP{ Size: 1024 * 1024 * 25, diff --git a/core/attachment.go b/core/attachment.go index d2506f6a..5c6f3dbb 100644 --- a/core/attachment.go +++ b/core/attachment.go @@ -39,6 +39,8 @@ type ( GetData(att *Attachment) ([]byte, error) // GetFS returns the attachment file system. GetFS() fs.FS + // GetSizeAll returns the size of all attachments. + GetSizeAll() (int64, error) // ListByMessage returns a list of attachments for a message without data. ListByMessage(msg *Message) ([]Attachment, error) // DeleteData deletes the data for an attachment. diff --git a/core/background.go b/core/background.go new file mode 100644 index 00000000..4262b736 --- /dev/null +++ b/core/background.go @@ -0,0 +1,7 @@ +package core + +import "context" + +type Background interface { + Run(ctx context.Context, done chan struct{}) +} diff --git a/core/database.go b/core/database.go index e4a44df7..e573ce7b 100644 --- a/core/database.go +++ b/core/database.go @@ -1,5 +1,11 @@ package core +import "errors" + +var ( + ErrDatabaseCleanup = errors.New("database cleanup") +) + type Database interface { Close() error AttachmentRepository() AttachmentRepositoryPort diff --git a/core/message.go b/core/message.go index a29d1577..679187a7 100644 --- a/core/message.go +++ b/core/message.go @@ -50,6 +50,8 @@ type ( LoadData(msg *Message) error // UpdateStatus updates the status of a message. UpdateStatus(msg *Message, status Status) error + // CleanUp removes messages and attachments if full. + CleanUp() error } MessageRepositoryPort interface { @@ -60,7 +62,7 @@ type ( // Get returns a message by it's UUID. Get(uuid string) (*Message, error) // List messages. - List(limit, offset int) ([]Message, error) + List(limit, offset int, reverse bool) ([]Message, error) // Update a message. Update(msg *Message, updateFN func(msg *Message) (*Message, error)) error // Delete a message. diff --git a/left/web/embed_dev.go b/left/web/embed_dev.go index 3326f2bc..50f3adec 100644 --- a/left/web/embed_dev.go +++ b/left/web/embed_dev.go @@ -9,22 +9,22 @@ import ( "path" ) -func getTemplateFS() fs.FS { - cwd, err := os.Getwd() +var projectDIR string + +func init() { + var err error + projectDIR, err = os.Getwd() if err != nil { panic(err) } +} - return os.DirFS(path.Join(cwd, packageDIR, templateDIR)) +func getTemplateFS() fs.FS { + return os.DirFS(path.Join(projectDIR, packageDIR, templateDIR)) } func GetAssetFS() fs.FS { - cwd, err := os.Getwd() - if err != nil { - panic(err) - } - - return os.DirFS(path.Join(cwd, packageDIR, assetDIR)) + return os.DirFS(path.Join(projectDIR, packageDIR, assetDIR)) } func (t *Templater) getTemplate(page string) *template.Template { diff --git a/right/repository/attachment.go b/right/repository/attachment.go index 34ff81b9..1eff5bf6 100644 --- a/right/repository/attachment.go +++ b/right/repository/attachment.go @@ -2,6 +2,7 @@ package repository import ( "io/fs" + "io/ioutil" "log" "os" "path" @@ -99,6 +100,26 @@ func (a *Attachment) GetData(att *core.Attachment) ([]byte, error) { return data, nil } +func (a *Attachment) GetSizeAll() (int64, error) { + if err := os.Chdir(a.attDir); err != nil { + return 0, err + } + + files, err := ioutil.ReadDir(a.attDir) + if err != nil { + return 0, err + } + + dirSize := int64(0) + for _, file := range files { + if file.Mode().IsRegular() { + dirSize += file.Size() + } + } + + return dirSize, nil +} + func (a *Attachment) ListByMessage(msg *core.Message) ([]core.Attachment, error) { var attsM []attachmentModel err := a.db.Select(q.Eq("MessageUUID", msg.UUID)).Find(&attsM) diff --git a/right/repository/message.go b/right/repository/message.go index 8fe00e19..f8b93131 100644 --- a/right/repository/message.go +++ b/right/repository/message.go @@ -100,9 +100,14 @@ func (m *Message) Update(msg *core.Message, updateFN func(msg *core.Message) (*c return tx.Commit() } -func (m *Message) List(limit, offset int) ([]core.Message, error) { +func (m *Message) List(limit, offset int, reverse bool) ([]core.Message, error) { var msgsM []messageModel - err := m.db.Select().OrderBy("CreatedAt").Limit(limit).Skip(offset).Reverse().Find(&msgsM) + query := m.db.Select().OrderBy("CreatedAt").Limit(limit).Skip(offset) + if reverse { + query = query.Reverse() + } + + err := query.Find(&msgsM) if err != nil && err != storm.ErrNotFound { return nil, err } diff --git a/right/repository/mock_attachment.go b/right/repository/mock_attachment.go index b88ce084..e2ea2e11 100644 --- a/right/repository/mock_attachment.go +++ b/right/repository/mock_attachment.go @@ -28,6 +28,10 @@ func (a AttachmentMock) GetFS() fs.FS { return a } +func (a AttachmentMock) GetSizeAll() (int64, error) { + return 0, nil +} + func (AttachmentMock) ListByMessage(msg *core.Message) ([]core.Attachment, error) { return []core.Attachment{}, nil } diff --git a/right/repository/mock_message.go b/right/repository/mock_message.go index 074bb2f2..d5c30184 100644 --- a/right/repository/mock_message.go +++ b/right/repository/mock_message.go @@ -20,7 +20,7 @@ func (MessageMock) Get(uuid string) (*core.Message, error) { return nil, core.ErrNotImplemented } -func (MessageMock) List(limit, offset int) ([]core.Message, error) { +func (MessageMock) List(limit, offset int, reverse bool) ([]core.Message, error) { return []core.Message{}, nil } diff --git a/service/message.go b/service/message.go index e464f82f..ca9cde52 100644 --- a/service/message.go +++ b/service/message.go @@ -1,21 +1,30 @@ package service import ( + "context" + "fmt" + "log" + "time" + + "github.com/ItsNotGoodName/smtpbridge/config" "github.com/ItsNotGoodName/smtpbridge/core" ) type Message struct { attachmentREPO core.AttachmentRepositoryPort messageREPO core.MessageRepositoryPort + size int64 } func NewMessage( + cfg *config.Config, attachmentREPO core.AttachmentRepositoryPort, messageREPO core.MessageRepositoryPort, ) *Message { return &Message{ attachmentREPO: attachmentREPO, messageREPO: messageREPO, + size: cfg.DB.Size, } } @@ -59,7 +68,7 @@ func (m *Message) Get(uuid string) (*core.Message, error) { } func (m *Message) List(limit, offset int) ([]core.Message, error) { - messages, err := m.messageREPO.List(limit, offset) + messages, err := m.messageREPO.List(limit, offset, true) if err != nil { return nil, err } @@ -92,3 +101,48 @@ func (m *Message) UpdateStatus(msg *core.Message, status core.Status) error { return msg, nil }) } + +func (m *Message) CleanUp() error { + for { + size, err := m.attachmentREPO.GetSizeAll() + if err != nil { + return err + } + if size < m.size { + return nil + } + + msgs, err := m.messageREPO.List(10, 0, false) + if err != nil { + return err + } + if len(msgs) == 0 { + return fmt.Errorf("%w: database over %d bytes, but no messages to delete", core.ErrDatabaseCleanup, size) + } + + for i := range msgs { + err := m.messageREPO.Delete(&msgs[i]) + if err != nil { + return err + } + } + } +} + +func (m *Message) Run(ctx context.Context, done chan struct{}) { + log.Println("service.Message.Run: started") + t := time.NewTicker(time.Minute * 10) + + for { + select { + case <-t.C: + if err := m.CleanUp(); err != nil { + log.Printf("service.Message.Run: %s", err) + } + case <-ctx.Done(): + log.Println("service.Message.Run: stopped") + done <- struct{}{} + return + } + } +}