Skip to content

Commit

Permalink
feat: MessageUpdated Update
Browse files Browse the repository at this point in the history
To be used in cases where one needs to atomically replace a message
(e.g: Drafts) in one go. This was previously possible with a combination
of Delete + Create update, but these were not guaranteed to be atomic
and could cause undefined behaviors if something else happened
in between.

The old logic to handle the replacement of delete messages on create has
been reverted. It is now expected that this type of behavior uses the
newly introduced update.
  • Loading branch information
LBeernaertProton committed Nov 3, 2022
1 parent 4093b99 commit 7b6e1df
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 27 deletions.
29 changes: 29 additions & 0 deletions connector/dummy_simulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,35 @@ func (conn *Dummy) MessagesCreated(messages []imap.Message, literals [][]byte, m
return nil
}

func (conn *Dummy) MessageUpdated(message imap.Message, literal []byte, mboxIDs []imap.MailboxID) error {
conn.state.lock.Lock()
defer conn.state.lock.Unlock()

parsedMessage, err := imap.NewParsedMessage(literal)
if err != nil {
return err
}

mboxIDMap := make(map[imap.MailboxID]struct{})

for _, mboxID := range mboxIDs {
mboxIDMap[mboxID] = struct{}{}
}

conn.state.messages[message.ID] = &dummyMessage{
literal: literal,
seen: message.Flags.Contains(imap.FlagSeen),
flagged: message.Flags.Contains(imap.FlagFlagged),
parsed: parsedMessage,
date: message.Date,
mboxIDs: mboxIDMap,
}

conn.pushUpdate(imap.NewMessageUpdated(message, literal, mboxIDs, parsedMessage))

return nil
}

func (conn *Dummy) MessageAdded(messageID imap.MessageID, mboxID imap.MailboxID) error {
conn.state.addMessageToMailbox(messageID, mboxID)

Expand Down
38 changes: 38 additions & 0 deletions imap/update_message_updated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package imap

import (
"fmt"
"github.com/bradenaw/juniper/xslices"
)

// MessageUpdated replaces the previous behavior of MessageDelete followed by MessageCreate. Furthermore, it guarantees
// that the operation is executed atomically.
type MessageUpdated struct {
updateBase
*updateWaiter

Message Message
Literal []byte
MailboxIDs []MailboxID
ParsedMessage *ParsedMessage
}

func NewMessageUpdated(message Message, literal []byte, mailboxIDs []MailboxID, parsedMessage *ParsedMessage) *MessageUpdated {
return &MessageUpdated{
updateWaiter: newUpdateWaiter(),
Message: message,
Literal: literal,
MailboxIDs: mailboxIDs,
ParsedMessage: parsedMessage,
}
}

func (u *MessageUpdated) String() string {
return fmt.Sprintf("MessageUpdate: ID:%v Mailboxes:%v Flags:%s",
u.Message.ID.ShortID(),
xslices.Map(u.MailboxIDs, func(mboxID MailboxID) string {
return mboxID.ShortID()
}),
u.Message.Flags.ToSlice(),
)
}
120 changes: 98 additions & 22 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (user *user) apply(ctx context.Context, update imap.Update) error {
case *imap.MessageDeleted:
return user.applyMessageDeleted(ctx, update)

case *imap.MessageUpdated:
return user.applyMessageUpdated(ctx, update)

case *imap.Noop:
return nil

Expand Down Expand Up @@ -167,28 +170,20 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
messageForMBox := make(map[imap.InternalMailboxID][]imap.InternalMessageID)
mboxInternalIDMap := make(map[imap.MailboxID]imap.InternalMailboxID)

var messagesToDelete []imap.InternalMessageID

if err := user.db.Read(ctx, func(ctx context.Context, client *ent.Client) error {
for _, message := range update.Messages {
internalID, ok := messagesToCreateFilter[message.Message.ID]
if !ok {
existingMessage, err := db.GetMessageFromRemoteIDWithDeletedFlag(ctx, client, message.Message.ID)
if ent.IsNotFound(err) {
internalID = user.nextMessageID()
} else if err != nil {
exists, err := db.HasMessageWithRemoteID(ctx, client, message.Message.ID)
if err != nil {
return err
} else if existingMessage.Deleted {
// This message has been marked as delete, but we received a create with the same messageID, we
// need to delete the old message from the database first before we can insert the new one.
logrus.WithField("message-id", message.Message.ID.ShortID()).Debug("Message marked delete, will be replaced with new incoming message")
messagesToDelete = append(messagesToDelete, existingMessage.ID)
internalID = user.nextMessageID()
} else {
// Message exists, but has not been marked deleted, so we can't replace.
return nil
} else if exists {
// Message exists, can't be replaced
continue
}

internalID = user.nextMessageID()

literal, err := rfc822.SetHeaderValue(message.Literal, ids.InternalIDKey, internalID.String())
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
Expand Down Expand Up @@ -244,13 +239,6 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
// assign them to the mailbox. There's an upper limit to the number of items badger can track in one transaction.
// This way we can keep the database consistent.
if err := user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
// Delete messages that need to be replaced
for _, chunk := range xslices.Chunk(messagesToDelete, db.ChunkLimit) {
if err := db.DeleteMessages(ctx, tx, chunk...); err != nil {
return err
}
}

for _, chunk := range xslices.Chunk(messagesToCreate, db.ChunkLimit) {
// Create messages in the store
for _, msg := range chunk {
Expand Down Expand Up @@ -511,3 +499,91 @@ func (user *user) applyMessageDeleted(ctx context.Context, update *imap.MessageD

return nil
}

func (user *user) applyMessageUpdated(ctx context.Context, update *imap.MessageUpdated) error {
var stateUpdates []state.Update

if err := user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
var exists bool

internalMessageID, err := db.GetMessageIDFromRemoteID(ctx, tx.Client(), update.Message.ID)
if err != nil {
if !ent.IsNotFound(err) {
return err
}
} else {
exists = true
}

// delete the message and remove from the mailboxes.
if exists {
mailboxes, err := db.GetMessageMailboxIDs(ctx, tx.Client(), internalMessageID)
if err != nil {
return err
}

messageIDs := []imap.InternalMessageID{internalMessageID}

for _, mailbox := range mailboxes {
updates, err := state.RemoveMessagesFromMailbox(ctx, tx, mailbox, messageIDs)
if err != nil {
return err
}

stateUpdates = append(stateUpdates, updates...)
}

if err := db.DeleteMessages(ctx, tx, internalMessageID); err != nil {
return err
}
}

// create new entry
{
newInternalID := user.nextMessageID()
literal, err := rfc822.SetHeaderValue(update.Literal, ids.InternalIDKey, newInternalID.String())
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}

request := &db.CreateMessageReq{
Message: update.Message,
Literal: literal,
Body: update.ParsedMessage.Body,
Structure: update.ParsedMessage.Structure,
Envelope: update.ParsedMessage.Envelope,
InternalID: newInternalID,
}

if _, err := db.CreateMessages(ctx, tx, request); err != nil {
return err
}

if err := user.store.Set(newInternalID, literal); err != nil {
return err
}

for _, mbox := range update.MailboxIDs {
internalMBoxID, err := db.GetMailboxIDFromRemoteID(ctx, tx.Client(), mbox)
if err != nil {
return err
}

_, update, err := state.AddMessagesToMailbox(ctx, tx, internalMBoxID, []imap.InternalMessageID{newInternalID}, nil)
if err != nil {
return err
}

stateUpdates = append(stateUpdates, update)
}
}

return nil
}); err != nil {
return err
}

user.queueStateUpdate(stateUpdates...)

return nil
}
9 changes: 6 additions & 3 deletions internal/backend/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,14 @@ func (user *user) deleteAllMessagesMarkedDeleted(ctx context.Context) error {
return user.store.Delete(ids...)
}

func (user *user) queueStateUpdate(update state.Update) {
func (user *user) queueStateUpdate(updates ...state.Update) {
if err := user.forState(func(state *state.State) error {
if !state.QueueUpdates(update) {
logrus.Errorf("Failed to push update to state %v", state.StateID)
for _, update := range updates {
if !state.QueueUpdates(update) {
logrus.Errorf("Failed to push update to state %v", state.StateID)
}
}

return nil
}); err != nil {
panic("unexpected, should not happen")
Expand Down
3 changes: 1 addition & 2 deletions tests/deleted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ func TestRemoteMessageUpdate(t *testing.T) {
c.S(`* STATUS "mbox1" (MESSAGES 1)`)
c.S(`A002 OK STATUS`)

s.messageDeleted("user", messageID)
s.messageCreatedWithID("user", messageID, mailboxID, []byte("To: [email protected]"), time.Now())
s.messageUpdatedWithID("user", messageID, mailboxID, []byte("To: [email protected]"), time.Now())
s.flush("user")

c.C(`A002 STATUS mbox1 (MESSAGES)`)
Expand Down
15 changes: 15 additions & 0 deletions tests/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Connector interface {

MessageCreated(imap.Message, []byte, []imap.MailboxID) error
MessagesCreated([]imap.Message, [][]byte, [][]imap.MailboxID) error
MessageUpdated(imap.Message, []byte, []imap.MailboxID) error
MessageAdded(imap.MessageID, imap.MailboxID) error
MessageRemoved(imap.MessageID, imap.MailboxID) error
MessageSeen(imap.MessageID, bool) error
Expand Down Expand Up @@ -204,6 +205,20 @@ func (s *testSession) messageCreatedWithID(user string, messageID imap.MessageID
s.conns[s.userIDs[user]].Flush()
}

func (s *testSession) messageUpdatedWithID(user string, messageID imap.MessageID, mailboxID imap.MailboxID, literal []byte, internalDate time.Time, flags ...string) {
require.NoError(s.tb, s.conns[s.userIDs[user]].MessageUpdated(
imap.Message{
ID: messageID,
Flags: imap.NewFlagSetFromSlice(flags),
Date: internalDate,
},
literal,
[]imap.MailboxID{mailboxID},
))

s.conns[s.userIDs[user]].Flush()
}

func (s *testSession) batchMessageCreated(user string, mailboxID imap.MailboxID, count int, createMessage func(int) ([]byte, []string)) []imap.MessageID {
return s.batchMessageCreatedWithID(user, mailboxID, count, func(i int) (imap.MessageID, []byte, []string) {
messageID := imap.MessageID(utils.NewRandomMessageID())
Expand Down

0 comments on commit 7b6e1df

Please sign in to comment.