Skip to content

Commit

Permalink
feat: auto cleanup database based on size
Browse files Browse the repository at this point in the history
  • Loading branch information
ItsNotGoodName committed Jan 4, 2022
1 parent 368bdef commit 4ce3720
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 19 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ database: # Database to store past messages
type: bolt # Needs to be 'bolt' to be enabled, Default ''
db: /tmp/smtpbridge.db # Default '$HOME/.smtpbridge/smtpbridge.db'
attachments: /tmp/attachments # Default '$HOME/.smtpbridge/attachments'
size: 2147483648 # Max total size of attachments in bytes, Default 2147483648 (2 GiB)

http: # HTTP server that shows past messages
enable: true # Enable http server, default 'false'
Expand All @@ -51,7 +52,7 @@ http: # HTTP server that shows past messages
smtp: # SMTP server that receives emails
host: "127.0.0.1" # Host to listen on, default ''
port: 1025 # Port to listen on, default 1025
size: 26214400 # Max allowed size of email in bytes, default 26214400 (25 MB)
size: 26214400 # Max allowed size of email in bytes, default 26214400 (25 MiB)
auth: true # Enable auth, default 'false'
username: user # Default ''
password: 12345678 # Default ''
Expand Down
19 changes: 15 additions & 4 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ THE SOFTWARE.
package cmd

import (
"context"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -66,7 +67,7 @@ var serverCmd = &cobra.Command{
// Init service
authSVC := service.NewAuth(serverConfig)
bridgeSVC := service.NewBridge(serverConfig, endpointREPO)
messageSVC := service.NewMessage(attachmentREPO, messageREPO)
messageSVC := service.NewMessage(serverConfig, attachmentREPO, messageREPO)
endpointSVC := service.NewEndpoint(endpointREPO, messageSVC)

// Init app
Expand All @@ -92,17 +93,27 @@ var serverCmd = &cobra.Command{
smtpServer := smtp.New(serverConfig, smtpBackend)
go smtpServer.Start()

// Start background message service
done := make(chan struct{})
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
go messageSVC.Run(ctx, done)

// Wait for interrupt
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
<-stop
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
cancel()

// Close database
err := db.Close()
if err != nil {
log.Println("error closing database:", err)
}

// Wait for background message service
<-done

log.Println("server stopped")
},
}
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ConfigDB struct {
Type string `json:"type" mapstructure:"type"`
DB string `json:"db" mapstructure:"db"`
Attachments string `json:"attachments" mapstructure:"attachments"`
Size int64 `json:"size" mapstructure:"size"`
}

func (db *ConfigDB) IsBolt() bool {
Expand Down Expand Up @@ -78,6 +79,7 @@ func New() *Config {
Type: "bolt",
DB: path.Join(rootPath, "smtpbridge.db"),
Attachments: path.Join(rootPath, "attachments"),
Size: 1024 * 1024 * 2048,
},
SMTP: ConfigSMTP{
Size: 1024 * 1024 * 25,
Expand Down
2 changes: 2 additions & 0 deletions core/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type (
GetData(att *Attachment) ([]byte, error)
// GetFS returns the attachment file system.
GetFS() fs.FS
// GetSizeAll returns the size of all attachments.
GetSizeAll() (int64, error)
// ListByMessage returns a list of attachments for a message without data.
ListByMessage(msg *Message) ([]Attachment, error)
// DeleteData deletes the data for an attachment.
Expand Down
7 changes: 7 additions & 0 deletions core/background.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package core

import "context"

type Background interface {
Run(ctx context.Context, done chan struct{})
}
6 changes: 6 additions & 0 deletions core/database.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package core

import "errors"

var (
ErrDatabaseCleanup = errors.New("database cleanup")
)

type Database interface {
Close() error
AttachmentRepository() AttachmentRepositoryPort
Expand Down
4 changes: 3 additions & 1 deletion core/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type (
LoadData(msg *Message) error
// UpdateStatus updates the status of a message.
UpdateStatus(msg *Message, status Status) error
// CleanUp removes messages and attachments if full.
CleanUp() error
}

MessageRepositoryPort interface {
Expand All @@ -60,7 +62,7 @@ type (
// Get returns a message by it's UUID.
Get(uuid string) (*Message, error)
// List messages.
List(limit, offset int) ([]Message, error)
List(limit, offset int, reverse bool) ([]Message, error)
// Update a message.
Update(msg *Message, updateFN func(msg *Message) (*Message, error)) error
// Delete a message.
Expand Down
18 changes: 9 additions & 9 deletions left/web/embed_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
"path"
)

func getTemplateFS() fs.FS {
cwd, err := os.Getwd()
var projectDIR string

func init() {
var err error
projectDIR, err = os.Getwd()
if err != nil {
panic(err)
}
}

return os.DirFS(path.Join(cwd, packageDIR, templateDIR))
func getTemplateFS() fs.FS {
return os.DirFS(path.Join(projectDIR, packageDIR, templateDIR))
}

func GetAssetFS() fs.FS {
cwd, err := os.Getwd()
if err != nil {
panic(err)
}

return os.DirFS(path.Join(cwd, packageDIR, assetDIR))
return os.DirFS(path.Join(projectDIR, packageDIR, assetDIR))
}

func (t *Templater) getTemplate(page string) *template.Template {
Expand Down
21 changes: 21 additions & 0 deletions right/repository/attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package repository

import (
"io/fs"
"io/ioutil"
"log"
"os"
"path"
Expand Down Expand Up @@ -99,6 +100,26 @@ func (a *Attachment) GetData(att *core.Attachment) ([]byte, error) {
return data, nil
}

func (a *Attachment) GetSizeAll() (int64, error) {
if err := os.Chdir(a.attDir); err != nil {
return 0, err
}

files, err := ioutil.ReadDir(a.attDir)
if err != nil {
return 0, err
}

dirSize := int64(0)
for _, file := range files {
if file.Mode().IsRegular() {
dirSize += file.Size()
}
}

return dirSize, nil
}

func (a *Attachment) ListByMessage(msg *core.Message) ([]core.Attachment, error) {
var attsM []attachmentModel
err := a.db.Select(q.Eq("MessageUUID", msg.UUID)).Find(&attsM)
Expand Down
9 changes: 7 additions & 2 deletions right/repository/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ func (m *Message) Update(msg *core.Message, updateFN func(msg *core.Message) (*c
return tx.Commit()
}

func (m *Message) List(limit, offset int) ([]core.Message, error) {
func (m *Message) List(limit, offset int, reverse bool) ([]core.Message, error) {
var msgsM []messageModel
err := m.db.Select().OrderBy("CreatedAt").Limit(limit).Skip(offset).Reverse().Find(&msgsM)
query := m.db.Select().OrderBy("CreatedAt").Limit(limit).Skip(offset)
if reverse {
query = query.Reverse()
}

err := query.Find(&msgsM)
if err != nil && err != storm.ErrNotFound {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions right/repository/mock_attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (a AttachmentMock) GetFS() fs.FS {
return a
}

func (a AttachmentMock) GetSizeAll() (int64, error) {
return 0, nil
}

func (AttachmentMock) ListByMessage(msg *core.Message) ([]core.Attachment, error) {
return []core.Attachment{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion right/repository/mock_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (MessageMock) Get(uuid string) (*core.Message, error) {
return nil, core.ErrNotImplemented
}

func (MessageMock) List(limit, offset int) ([]core.Message, error) {
func (MessageMock) List(limit, offset int, reverse bool) ([]core.Message, error) {
return []core.Message{}, nil
}

Expand Down
56 changes: 55 additions & 1 deletion service/message.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
package service

import (
"context"
"fmt"
"log"
"time"

"github.com/ItsNotGoodName/smtpbridge/config"
"github.com/ItsNotGoodName/smtpbridge/core"
)

type Message struct {
attachmentREPO core.AttachmentRepositoryPort
messageREPO core.MessageRepositoryPort
size int64
}

func NewMessage(
cfg *config.Config,
attachmentREPO core.AttachmentRepositoryPort,
messageREPO core.MessageRepositoryPort,
) *Message {
return &Message{
attachmentREPO: attachmentREPO,
messageREPO: messageREPO,
size: cfg.DB.Size,
}
}

Expand Down Expand Up @@ -59,7 +68,7 @@ func (m *Message) Get(uuid string) (*core.Message, error) {
}

func (m *Message) List(limit, offset int) ([]core.Message, error) {
messages, err := m.messageREPO.List(limit, offset)
messages, err := m.messageREPO.List(limit, offset, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -92,3 +101,48 @@ func (m *Message) UpdateStatus(msg *core.Message, status core.Status) error {
return msg, nil
})
}

func (m *Message) CleanUp() error {
for {
size, err := m.attachmentREPO.GetSizeAll()
if err != nil {
return err
}
if size < m.size {
return nil
}

msgs, err := m.messageREPO.List(10, 0, false)
if err != nil {
return err
}
if len(msgs) == 0 {
return fmt.Errorf("%w: database over %d bytes, but no messages to delete", core.ErrDatabaseCleanup, size)
}

for i := range msgs {
err := m.messageREPO.Delete(&msgs[i])
if err != nil {
return err
}
}
}
}

func (m *Message) Run(ctx context.Context, done chan struct{}) {
log.Println("service.Message.Run: started")
t := time.NewTicker(time.Minute * 10)

for {
select {
case <-t.C:
if err := m.CleanUp(); err != nil {
log.Printf("service.Message.Run: %s", err)
}
case <-ctx.Done():
log.Println("service.Message.Run: stopped")
done <- struct{}{}
return
}
}
}

0 comments on commit 4ce3720

Please sign in to comment.