Skip to content

Commit

Permalink
fix(GODT-1757): Recent flag behavior and snapshot db reads
Browse files Browse the repository at this point in the history
This patch removes the need for the snapshots to read new messages from
the database when they arrive via an `exists` responder.

We have also updated the behavior of the `Recent` flag to match
dovecot's implementation. If a client append a message to a selected
mailbox only that client will receive the recent flags. If a client
appends a message to a non-selected mailbox, the first state which
receives the update will apply the recent flag.

To achieve the latter, we added a new update `ExistsStateUpdate` which
enforces the logic described above in a thread safe manner.
  • Loading branch information
LBeernaertProton committed Sep 7, 2022
1 parent 81b1a3a commit 9c7cb48
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 98 deletions.
1 change: 1 addition & 0 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (b *Backend) GetState(username, password string, sessionID int) (*state.Sta
logrus.
WithField("userID", userID).
WithField("username", username).
WithField("stateID", state.StateID).
Debug("Created new IMAP state")

return state, nil
Expand Down
9 changes: 3 additions & 6 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,8 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
return err
}

responders := xslices.Map(messageIDs, func(messageID ids.MessageIDPair) state.Responder {
return state.NewExists(messageID.InternalID, messageUIDs[messageID.InternalID])
})
user.queueStateUpdate(state.NewExistsStateUpdate(internalMailboxID, messageIDs, messageUIDs, nil))

user.queueStateUpdate(state.NewMailboxIDResponderStateUpdate(internalMailboxID, responders...))
}

return nil
Expand Down Expand Up @@ -347,8 +344,8 @@ func (user *user) setMessageMailboxes(ctx context.Context, tx *ent.Tx, messageID
}

// applyMessagesAddedToMailbox adds the messages to the given mailbox.
func (user *user) applyMessagesAddedToMailbox(ctx context.Context, tx *ent.Tx, mboxID imap.InternalMailboxID, messageIDs []imap.InternalMessageID) (map[imap.InternalMessageID]int, error) {
messageUIDs, update, err := state.AddMessagesToMailbox(ctx, tx, mboxID, messageIDs)
func (user *user) applyMessagesAddedToMailbox(ctx context.Context, tx *ent.Tx, mboxID imap.InternalMailboxID, messageIDs []imap.InternalMessageID) (map[imap.InternalMessageID]*ent.UID, error) {
messageUIDs, update, err := state.AddMessagesToMailbox(ctx, tx, mboxID, messageIDs, nil)
if err != nil {
return nil, err
}
Expand Down
51 changes: 47 additions & 4 deletions internal/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func CreateMessages(ctx context.Context, tx *ent.Tx, reqs ...*CreateMessageReq)
})...).Save(ctx)
}

func AddMessagesToMailbox(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) (map[imap.InternalMessageID]int, error) {
func AddMessagesToMailbox(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) (map[imap.InternalMessageID]*ent.UID, error) {
messageUIDs := make(map[imap.InternalMessageID]int)

mbox, err := tx.Mailbox.Query().Where(mailbox.MailboxID(mboxID)).Only(ctx)
Expand Down Expand Up @@ -88,10 +88,10 @@ func AddMessagesToMailbox(ctx context.Context, tx *ent.Tx, messageIDs []imap.Int
return nil, err
}

return messageUIDs, nil
return GetMessageUIDsWithFlags(ctx, tx.Client(), mboxID, messageIDs)
}

func BumpMailboxUIDsForMessage(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) (map[imap.InternalMessageID]int, error) {
func BumpMailboxUIDsForMessage(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) (map[imap.InternalMessageID]*ent.UID, error) {
messageUIDs := make(map[imap.InternalMessageID]int)

mbox, err := tx.Mailbox.Query().Where(mailbox.MailboxID(mboxID)).Only(ctx)
Expand Down Expand Up @@ -121,7 +121,7 @@ func BumpMailboxUIDsForMessage(ctx context.Context, tx *ent.Tx, messageIDs []ima
return nil, err
}

return messageUIDs, nil
return GetMessageUIDsWithFlags(ctx, tx.Client(), mboxID, messageIDs)
}

func RemoveMessagesFromMailbox(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) error {
Expand Down Expand Up @@ -207,6 +207,7 @@ func GetMessageUIDs(ctx context.Context, client *ent.Client, mboxID imap.Interna
).
WithMessage(func(query *ent.MessageQuery) {
query.Select(message.FieldMessageID)
query.Select(message.FieldRemoteID)
}).
All(ctx)
if err != nil {
Expand All @@ -222,6 +223,32 @@ func GetMessageUIDs(ctx context.Context, client *ent.Client, mboxID imap.Interna
return res, nil
}

func GetMessageUIDsWithFlags(ctx context.Context, client *ent.Client, mboxID imap.InternalMailboxID, messageIDs []imap.InternalMessageID) (map[imap.InternalMessageID]*ent.UID, error) {
messageUIDs, err := client.UID.Query().
Where(
uid.HasMailboxWith(mailbox.MailboxID(mboxID)),
uid.HasMessageWith(message.MessageIDIn(messageIDs...)),
).
WithMessage(func(query *ent.MessageQuery) {
query.Select(message.FieldMessageID, message.FieldRemoteID)
query.WithFlags(func(query *ent.MessageFlagQuery) {
query.Select(messageflag.FieldValue)
})
}).
All(ctx)
if err != nil {
return nil, err
}

res := make(map[imap.InternalMessageID]*ent.UID)

for _, messageUID := range messageUIDs {
res[messageUID.Edges.Message.MessageID] = messageUID
}

return res, nil
}

// GetMessageFlags returns the flags of the given messages.
// It does not include per-mailbox flags (\Deleted, \Recent)!
func GetMessageFlags(ctx context.Context, client *ent.Client, messageIDs []imap.InternalMessageID) (map[imap.InternalMessageID]imap.FlagSet, error) {
Expand Down Expand Up @@ -467,3 +494,19 @@ func GetMessageIDFromRemoteID(ctx context.Context, client *ent.Client, id imap.M

return message.MessageID, nil
}

func NewFlagSet(msgUID *ent.UID, flags []*ent.MessageFlag) imap.FlagSet {
flagSet := imap.NewFlagSetFromSlice(xslices.Map(flags, func(flag *ent.MessageFlag) string {
return flag.Value
}))

if msgUID.Deleted {
flagSet = flagSet.Add(imap.FlagDeleted)
}

if msgUID.Recent {
flagSet = flagSet.Add(imap.FlagRecent)
}

return flagSet
}
14 changes: 7 additions & 7 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func (s *Session) serve(ctx context.Context) error {

for {
select {
case stateUpdate := <-s.state.GetStateUpdatesCh():
if err := s.state.ApplyUpdate(ctx, stateUpdate); err != nil {
logrus.WithError(err).Error("Failed to apply state update")
}

continue

case res, ok := <-cmdCh:
if !ok {
logrus.Debugf("Failed to read from command channel")
Expand All @@ -163,13 +170,6 @@ func (s *Session) serve(ctx context.Context) error {
case <-s.state.Done():
return nil

case stateUpdate := <-s.state.GetStateUpdatesCh():
if err := s.state.ApplyUpdate(ctx, stateUpdate); err != nil {
logrus.WithError(err).Error("Failed to apply state update")
}

continue

case <-ctx.Done():
return ctx.Err()
}
Expand Down
35 changes: 29 additions & 6 deletions internal/state/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (state *State) actionCreateMessage(
literal []byte,
flags imap.FlagSet,
date time.Time,
isSelectedMailbox bool,
) (int, error) {
internalID, res, err := state.user.GetRemote().CreateMessage(ctx, mboxID.RemoteID, literal, flags, date)
if err != nil {
Expand Down Expand Up @@ -136,19 +137,35 @@ func (state *State) actionCreateMessage(
return 0, err
}

if err := state.user.QueueOrApplyStateUpdate(ctx, tx, NewMailboxIDResponderStateUpdate(mboxID.InternalID, NewExists(internalID, messageUIDs[internalID]))); err != nil {
// We can append to non-selected mailboxes.
var st *State
if isSelectedMailbox {
st = state
}

uid := messageUIDs[internalID]
if err := state.user.QueueOrApplyStateUpdate(
ctx,
tx,
newExistsStateUpdateWithExists(
mboxID.InternalID,
[]*exists{newExists(ids.MessageIDPair{InternalID: internalID, RemoteID: res.ID}, uid.UID, db.NewFlagSet(uid, uid.Edges.Message.Edges.Flags))},
st,
),
); err != nil {
return 0, err
}

return messageUIDs[internalID], nil
return messageUIDs[internalID].UID, nil
}

func (state *State) actionAddMessagesToMailbox(
ctx context.Context,
tx *ent.Tx,
messageIDs []ids.MessageIDPair,
mboxID ids.MailboxIDPair,
) (map[imap.InternalMessageID]int, error) {
isMailboxSelected bool,
) (map[imap.InternalMessageID]*ent.UID, error) {
var haveMessageIDs []ids.MessageIDPair
if state.snap != nil && state.snap.mboxID.InternalID == mboxID.InternalID {
haveMessageIDs = state.snap.getAllMessageIDs()
Expand All @@ -175,7 +192,13 @@ func (state *State) actionAddMessagesToMailbox(
return nil, err
}

messageUIDs, update, err := AddMessagesToMailbox(ctx, tx, mboxID.InternalID, internalIDs)
// Messages can be added to a mailbox that is not selected.
var st *State
if isMailboxSelected {
st = state
}

messageUIDs, update, err := AddMessagesToMailbox(ctx, tx, mboxID.InternalID, internalIDs, st)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,7 +250,7 @@ func (state *State) actionMoveMessages(
tx *ent.Tx,
messageIDs []ids.MessageIDPair,
mboxFromID, mboxToID ids.MailboxIDPair,
) (map[imap.InternalMessageID]int, error) {
) (map[imap.InternalMessageID]*ent.UID, error) {
if mboxFromID.InternalID == mboxToID.InternalID {
internalIDs, _ := ids.SplitMessageIDPairSlice(messageIDs)

Expand Down Expand Up @@ -272,7 +295,7 @@ func (state *State) actionMoveMessages(
return nil, err
}

messageUIDs, updates, err := MoveMessagesFromMailbox(ctx, tx, mboxFromID.InternalID, mboxToID.InternalID, internalIDs)
messageUIDs, updates, err := MoveMessagesFromMailbox(ctx, tx, mboxFromID.InternalID, mboxToID.InternalID, internalIDs, state)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions internal/state/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,17 @@ func (m *Mailbox) Append(ctx context.Context, literal []byte, flags imap.FlagSet
return db.HasMessageWithID(ctx, client, msgID)
}); err != nil || !exists {
logrus.WithError(err).Warn("The message has an unknown internal ID")
} else if res, err := db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (map[imap.InternalMessageID]int, error) {
return m.state.actionAddMessagesToMailbox(ctx, tx, []ids.MessageIDPair{ids.NewMessageIDPairWithoutRemote(msgID)}, ids.NewMailboxIDPair(m.mbox))
} else if res, err := db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (map[imap.InternalMessageID]*ent.UID, error) {
return m.state.actionAddMessagesToMailbox(ctx, tx, []ids.MessageIDPair{ids.NewMessageIDPairWithoutRemote(msgID)}, ids.NewMailboxIDPair(m.mbox), m.snap == m.state.snap)
}); err != nil {
return 0, err
} else {
return res[msgID], nil
return res[msgID].UID, nil
}
}

return db.WriteAndStoreResult(ctx, m.state.db(), m.state.user.GetStore(), func(ctx context.Context, tx *ent.Tx, transaction store.Transaction) (int, error) {
return m.state.actionCreateMessage(ctx, tx, transaction, m.snap.mboxID, literal, flags, date)
return m.state.actionCreateMessage(ctx, tx, transaction, m.snap.mboxID, literal, flags, date, m.snap == m.state.snap)
})
}

Expand Down Expand Up @@ -176,8 +176,8 @@ func (m *Mailbox) Copy(ctx context.Context, seq *proto.SequenceSet, name string)
return msg.UID
})

destUIDs, err := db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (map[imap.InternalMessageID]int, error) {
return m.state.actionAddMessagesToMailbox(ctx, tx, msgIDs, ids.NewMailboxIDPair(mbox))
destUIDs, err := db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (map[imap.InternalMessageID]*ent.UID, error) {
return m.state.actionAddMessagesToMailbox(ctx, tx, msgIDs, ids.NewMailboxIDPair(mbox), m.snap == m.state.snap)
})
if err != nil {
return nil, err
Expand All @@ -187,7 +187,7 @@ func (m *Mailbox) Copy(ctx context.Context, seq *proto.SequenceSet, name string)

if len(destUIDs) > 0 {
res = response.ItemCopyUID(mbox.UIDValidity, msgUIDs, xslices.Map(maps.Keys(destUIDs), func(messageID imap.InternalMessageID) int {
return destUIDs[messageID]
return destUIDs[messageID].UID
}))
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func (m *Mailbox) Move(ctx context.Context, seq *proto.SequenceSet, name string)
return msg.UID
})

destUIDs, err := db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (map[imap.InternalMessageID]int, error) {
destUIDs, err := db.WriteResult(ctx, m.state.db(), func(ctx context.Context, tx *ent.Tx) (map[imap.InternalMessageID]*ent.UID, error) {
return m.state.actionMoveMessages(ctx, tx, msgIDs, m.snap.mboxID, ids.NewMailboxIDPair(mbox))
})
if err != nil {
Expand All @@ -229,7 +229,7 @@ func (m *Mailbox) Move(ctx context.Context, seq *proto.SequenceSet, name string)

if len(destUIDs) > 0 {
res = response.ItemCopyUID(mbox.UIDValidity, msgUIDs, xslices.Map(maps.Keys(destUIDs), func(messageID imap.InternalMessageID) int {
return destUIDs[messageID]
return destUIDs[messageID].UID
}))
}

Expand Down
Loading

0 comments on commit 9c7cb48

Please sign in to comment.