Skip to content

Commit

Permalink
fix(GODT-2425): Ensure connector updates lock db for writes
Browse files Browse the repository at this point in the history
Ensure all connector updates perform all look ups in the transaction
scope so that the modifications are up to date.
  • Loading branch information
LBeernaertProton committed Mar 6, 2023
1 parent e573f31 commit 2cc03e5
Showing 1 changed file with 53 additions and 73 deletions.
126 changes: 53 additions & 73 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,29 @@ func (user *user) applyMailboxDeleted(ctx context.Context, update *imap.MailboxD
return fmt.Errorf("attempting to delete protected mailbox (recovery)")
}

internalMailboxID, err := db.ReadResult(ctx, user.db, func(ctx context.Context, client *ent.Client) (imap.InternalMailboxID, error) {
return db.GetMailboxIDFromRemoteID(ctx, client, update.MailboxID)
})
if err != nil {
if ent.IsNotFound(err) {
return nil
stateUpdate, err := db.WriteResult(ctx, user.db, func(ctx context.Context, tx *ent.Tx) (state.Update, error) {
internalMailboxID, err := db.GetMailboxIDFromRemoteID(ctx, tx.Client(), update.MailboxID)
if err != nil {
if ent.IsNotFound(err) {
return nil, nil
}

return nil, err
}

return err
}
if err := db.DeleteMailboxWithRemoteID(ctx, tx, update.MailboxID); err != nil {
return nil, err
}

if err := user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
return db.DeleteMailboxWithRemoteID(ctx, tx, update.MailboxID)
}); err != nil {
return state.NewMailboxDeletedStateUpdate(internalMailboxID), nil
})
if err != nil {
return err
}

user.queueStateUpdate(state.NewMailboxDeletedStateUpdate(internalMailboxID))
if stateUpdate != nil {
user.queueStateUpdate(stateUpdate)
}

return nil
}
Expand All @@ -153,26 +158,24 @@ func (user *user) applyMailboxUpdated(ctx context.Context, update *imap.MailboxU
return fmt.Errorf("attempting to rename protected mailbox (recovery)")
}

if exists, err := db.ReadResult(ctx, user.db, func(ctx context.Context, client *ent.Client) (bool, error) {
return db.MailboxExistsWithRemoteID(ctx, client, update.MailboxID)
}); err != nil {
return err
} else if !exists {
return nil
}
return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
client := tx.Client()

currentName, err := db.ReadResult(ctx, user.db, func(ctx context.Context, client *ent.Client) (string, error) {
return db.GetMailboxNameWithRemoteID(ctx, client, update.MailboxID)
})
if err != nil {
return err
}
if exists, err := db.MailboxExistsWithRemoteID(ctx, client, update.MailboxID); err != nil {
return err
} else if !exists {
return nil
}

if currentName == strings.Join(update.MailboxName, user.delimiter) {
return nil
}
currentName, err := db.GetMailboxNameWithRemoteID(ctx, client, update.MailboxID)
if err != nil {
return err
}

if currentName == strings.Join(update.MailboxName, user.delimiter) {
return nil
}

return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
return db.RenameMailboxWithRemoteID(ctx, tx, update.MailboxID, strings.Join(update.MailboxName, user.delimiter))
})
}
Expand Down Expand Up @@ -207,7 +210,9 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
messageForMBox := make(map[imap.InternalMailboxID][]imap.InternalMessageID)
mboxInternalIDMap := make(map[imap.MailboxID]imap.InternalMailboxID)

if err := user.db.Read(ctx, func(ctx context.Context, client *ent.Client) error {
return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
client := tx.Client()

for _, message := range update.Messages {
if slices.Contains(message.MailboxIDs, ids.GluonInternalRecoveryMailboxRemoteID) {
logrus.Errorf("attempting to import messages into protected mailbox (recovery), skipping")
Expand Down Expand Up @@ -277,19 +282,11 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
}
}
}
return nil
}); err != nil {
return err
}

if len(messagesToCreate) == 0 && len(messageForMBox) == 0 {
return nil
}
if len(messagesToCreate) == 0 && len(messageForMBox) == 0 {
return nil
}

// We sadly have to split this up into two separate transactions where we create the messages and one where we
// assign them to the mailbox. There's an upper limit to the number of items badger can track in one transaction.
// This way we can keep the database consistent.
if err := user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
for _, chunk := range xslices.Chunk(messagesToCreate, db.ChunkLimit) {
// Create messages in the store in parallel
numStoreRoutines := runtime.NumCPU() / 4
Expand All @@ -315,12 +312,6 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
}
}

return nil
}); err != nil {
return err
}

return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
// Assign all the messages to the mailbox
for mboxID, msgList := range messageForMBox {
inMailbox, err := db.FilterMailboxContainsInternalID(ctx, tx.Client(), mboxID, msgList)
Expand Down Expand Up @@ -357,37 +348,24 @@ func (user *user) applyMessageMailboxesUpdated(ctx context.Context, update *imap
return state.ErrNoSuchMessage
}

type Result struct {
InternalMsgID imap.InternalMessageID
InternalMBoxIDs []imap.InternalMailboxID
}
return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
client := tx.Client()

result, err := db.ReadResult(ctx, user.db, func(ctx context.Context, client *ent.Client) (Result, error) {
internalMsgID, err := db.GetMessageIDFromRemoteID(ctx, client, update.MessageID)
if err != nil {
return Result{}, err
return err
}

internalMBoxIDs, err := db.TranslateRemoteMailboxIDs(ctx, client, update.MailboxIDs)
if err != nil {
return Result{}, err
return err
}

return Result{
InternalMsgID: internalMsgID,
InternalMBoxIDs: internalMBoxIDs,
}, nil
})
if err != nil {
return err
}

return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
if err := user.setMessageMailboxes(ctx, tx, result.InternalMsgID, result.InternalMBoxIDs); err != nil {
if err := user.setMessageMailboxes(ctx, tx, internalMsgID, internalMBoxIDs); err != nil {
return err
}

if err := user.setMessageFlags(ctx, tx, result.InternalMsgID, update.Seen, update.Flagged, update.Draft); err != nil {
if err := user.setMessageFlags(ctx, tx, internalMsgID, update.Seen, update.Flagged, update.Draft); err != nil {
return err
}

Expand All @@ -405,14 +383,16 @@ func (user *user) applyMessageFlagsUpdated(ctx context.Context, update *imap.Mes
return state.ErrNoSuchMessage
}

internalMsgID, err := db.ReadResult(ctx, user.db, func(ctx context.Context, client *ent.Client) (imap.InternalMessageID, error) {
return db.GetMessageIDFromRemoteID(ctx, client, update.MessageID)
})
if err != nil {
return err
}

return user.db.Write(ctx, func(ctx context.Context, tx *ent.Tx) error {
internalMsgID, err := db.GetMessageIDFromRemoteID(ctx, tx.Client(), update.MessageID)

if err != nil {
if ent.IsNotFound(err) {
return state.ErrNoSuchMessage
}
return err
}

if err := user.setMessageFlags(ctx, tx, internalMsgID, update.Seen, update.Flagged, update.Draft); err != nil {
return err
}
Expand Down

0 comments on commit 2cc03e5

Please sign in to comment.