Skip to content

Commit

Permalink
fix(GODT-1570): Fix too many SQL Variable in sync benchmark
Browse files Browse the repository at this point in the history
Batch incoming connector updates for new message to 1000 messages per
transaction to avoid running into too many SQL variables.

We batched at outside of the transaction level so as not to completely
block the other threads forever until we are done processing a high
number of updates.

The code has also been updated to use a more optimized database query.
  • Loading branch information
LBeernaertProton committed Sep 12, 2022
1 parent 8c9b3f1 commit c08f510
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 55 deletions.
115 changes: 64 additions & 51 deletions internal/backend/connector_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,76 +159,89 @@ func (user *user) applyMessagesCreated(ctx context.Context, update *imap.Message
return err
}

var reqs []*db.CreateMessageReq
remoteMessageRequestsMap := make(map[imap.MessageID]*db.CreateMessageReq, len(updates))

remoteToLocalMessageID := make(map[imap.MessageID]imap.InternalMessageID)
const maxUpdateChunk = 1000

return db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, storeTx store.Transaction) error {
for _, update := range updates {
internalID := uuid.NewString()
chunkedUpdates := xslices.Chunk(updates, maxUpdateChunk)

literal, err := rfc822.SetHeaderValue(update.Literal, ids.InternalIDKey, internalID)
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}
for _, chunk := range chunkedUpdates {
messagesForMailboxes := make(map[imap.InternalMailboxID][]*db.CreateMessageReq)

if err := storeTx.Set(imap.InternalMessageID(internalID), literal); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}
if err := db.WriteAndStore(ctx, user.db, user.store, func(ctx context.Context, tx *ent.Tx, storeTx store.Transaction) error {
for _, update := range chunk {

reqs = append(reqs, &db.CreateMessageReq{
Message: update.Message,
Literal: literal,
Body: update.Body,
Structure: update.Structure,
Envelope: update.Envelope,
InternalID: imap.InternalMessageID(internalID),
})
var request *db.CreateMessageReq

remoteToLocalMessageID[update.Message.ID] = imap.InternalMessageID(internalID)
}
// Do not reprocess the message if we have previously processed it.
if req, ok := remoteMessageRequestsMap[update.Message.ID]; ok {
request = req
} else {

if _, err := db.CreateMessages(ctx, tx, reqs...); err != nil {
return fmt.Errorf("failed to create message: %w", err)
}
internalID := uuid.NewString()

messageIDs := make(map[imap.LabelID][]ids.MessageIDPair)
literal, err := rfc822.SetHeaderValue(update.Literal, ids.InternalIDKey, internalID)
if err != nil {
return fmt.Errorf("failed to set internal ID: %w", err)
}

for _, update := range updates {
for _, mailboxID := range update.MailboxIDs {
localID := remoteToLocalMessageID[update.Message.ID]
idPair := ids.MessageIDPair{
InternalID: localID,
RemoteID: update.Message.ID,
}
if err := storeTx.Set(imap.InternalMessageID(internalID), literal); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}

request = &db.CreateMessageReq{
Message: update.Message,
Literal: literal,
Body: update.Body,
Structure: update.Structure,
Envelope: update.Envelope,
InternalID: imap.InternalMessageID(internalID),
}

if !slices.Contains(messageIDs[mailboxID], idPair) {
messageIDs[mailboxID] = append(messageIDs[mailboxID], idPair)
remoteMessageRequestsMap[update.Message.ID] = req
}
}
}

for mailboxID, messageIDs := range messageIDs {
internalMailboxID, err := db.GetMailboxIDWithRemoteID(ctx, tx.Client(), mailboxID)
if err != nil {
return err
}
for _, mbox := range update.MailboxIDs {
internalMailboxID, err := db.GetMailboxIDWithRemoteID(ctx, tx.Client(), mbox)
if err != nil {
return err
}

internalIDs := xslices.Map(messageIDs, func(id ids.MessageIDPair) imap.InternalMessageID {
return id.InternalID
})
existingRequests := messagesForMailboxes[internalMailboxID]

messageUIDs, err := db.AddMessagesToMailbox(ctx, tx, internalIDs, internalMailboxID)
if err != nil {
return err
var alreadyPresent bool

for _, e := range existingRequests {
if e.Message.ID == request.Message.ID {
alreadyPresent = true
break
}
}

if alreadyPresent {
continue
}

messagesForMailboxes[internalMailboxID] = append(messagesForMailboxes[internalMailboxID], request)
}
}

user.queueStateUpdate(state.NewExistsStateUpdate(internalMailboxID, messageIDs, messageUIDs, nil))
for mbox, request := range messagesForMailboxes {
result, err := db.CreateAndAddMessagesToMailbox(ctx, tx, mbox, request)
if err != nil {
return err
}

user.queueStateUpdate(state.NewExistsStateUpdate(mbox, result, nil))
}

return nil
}); err != nil {
return err
}
}

return nil
})
return nil
}

// applyMessageLabelsUpdated applies a MessageLabelsUpdated update.
Expand Down
89 changes: 89 additions & 0 deletions internal/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ProtonMail/gluon/internal/db/ent/message"
"github.com/ProtonMail/gluon/internal/db/ent/messageflag"
"github.com/ProtonMail/gluon/internal/db/ent/uid"
"github.com/ProtonMail/gluon/internal/ids"
"github.com/bradenaw/juniper/xslices"
)

Expand Down Expand Up @@ -131,6 +132,7 @@ func CreateAndAddMessageToMailbox(ctx context.Context, tx *ent.Tx, mboxID imap.I
SetMessage(message).
SetUID(mbox.UIDNext).
Save(ctx)

if err != nil {
return 0, imap.FlagSet{}, err
}
Expand All @@ -142,6 +144,93 @@ func CreateAndAddMessageToMailbox(ctx context.Context, tx *ent.Tx, mboxID imap.I
return uid.UID, NewFlagSet(uid, flags), err
}

type CreateAndAddMessagesResult struct {
UID imap.UID
Flags imap.FlagSet
MessageID ids.MessageIDPair
}

func CreateAndAddMessagesToMailbox(ctx context.Context, tx *ent.Tx, mboxID imap.InternalMailboxID, requests []*CreateMessageReq) ([]CreateAndAddMessagesResult, error) {
mbox, err := tx.Mailbox.Query().Where(mailbox.ID(mboxID)).Select(mailbox.FieldID, mailbox.FieldUIDNext).Only(ctx)
if err != nil {
return nil, err
}

msgBuilders := make([]*ent.MessageCreate, 0, len(requests))
flags := make([][]*ent.MessageFlag, 0, len(requests))

for _, request := range requests {
builders := xslices.Map(request.Message.Flags.ToSlice(), func(flag string) *ent.MessageFlagCreate {
return tx.MessageFlag.Create().SetValue(flag)
})

entFlags, err := tx.MessageFlag.CreateBulk(builders...).Save(ctx)
if err != nil {
return nil, err
}

flags = append(flags, entFlags)

msgCreate := tx.Message.Create().
SetID(request.InternalID).
SetDate(request.Message.Date).
SetBody(request.Body).
SetBodyStructure(request.Structure).
SetEnvelope(request.Envelope).
SetSize(len(request.Literal)).
AddFlags(entFlags...)

if len(request.Message.ID) != 0 {
msgCreate = msgCreate.SetRemoteID(request.Message.ID)
}

msgBuilders = append(msgBuilders, msgCreate)
}

messages, err := tx.Message.CreateBulk(msgBuilders...).Save(ctx)
if err != nil {
return nil, err
}

uidBuilders := make([]*ent.UIDCreate, 0, len(requests))

for i, message := range messages {
uidBuilders = append(uidBuilders, tx.UID.Create().
SetMailboxID(mbox.ID).
SetMessageID(message.ID).
SetUID(mbox.UIDNext.Add(uint32(i))),
)
}

uids, err := tx.UID.CreateBulk(uidBuilders...).Save(ctx)
if err != nil {
return nil, err
}

if err := BumpMailboxUIDNext(ctx, tx, mbox, len(requests)); err != nil {
return nil, err
}

result := make([]CreateAndAddMessagesResult, 0, len(requests))

for i := 0; i < len(requests); i++ {
if uids[i].UID != mbox.UIDNext.Add(uint32(i)) {
panic("Invalid UID ")
}

result = append(result, CreateAndAddMessagesResult{
MessageID: ids.MessageIDPair{
InternalID: messages[i].ID,
RemoteID: messages[i].RemoteID,
},
UID: uids[i].UID,
Flags: NewFlagSet(uids[i], flags[i]),
})
}

return result, err
}

func BumpMailboxUIDsForMessage(ctx context.Context, tx *ent.Tx, messageIDs []imap.InternalMessageID, mboxID imap.InternalMailboxID) (map[imap.InternalMessageID]*ent.UID, error) {
messageUIDs := make(map[imap.InternalMessageID]imap.UID)

Expand Down
7 changes: 3 additions & 4 deletions internal/state/responders.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type ExistsStateUpdate struct {
targetStateSet bool
}

func NewExistsStateUpdate(mailboxID imap.InternalMailboxID, messageIDs []ids.MessageIDPair, uids map[imap.InternalMessageID]*ent.UID, s *State) Update {
func NewExistsStateUpdate(mailboxID imap.InternalMailboxID, messages []db.CreateAndAddMessagesResult, s *State) Update {
var stateID StateID

var targetStateSet bool
Expand All @@ -159,9 +159,8 @@ func NewExistsStateUpdate(mailboxID imap.InternalMailboxID, messageIDs []ids.Mes
targetStateSet = true
}

responders := xslices.Map(messageIDs, func(messageID ids.MessageIDPair) *exists {
uid := uids[messageID.InternalID]
exists := newExists(ids.NewMessageIDPair(uid.Edges.Message), uid.UID, db.NewFlagSet(uid, uid.Edges.Message.Edges.Flags))
responders := xslices.Map(messages, func(result db.CreateAndAddMessagesResult) *exists {
exists := newExists(result.MessageID, result.UID, result.Flags)

return exists
})
Expand Down

0 comments on commit c08f510

Please sign in to comment.