Skip to content

Commit

Permalink
feat(GODT-2252): Recover from missing cache files
Browse files Browse the repository at this point in the history
Allow commands such as fetch and search to re-download literals from the
connector if they are no longer available.

To ensure parallel fetches which download the same literal don't cause
file system issue with writing multiple files, we wrap the store
implementation with a layer that controls access to the message with a
RWLock.
  • Loading branch information
LBeernaertProton committed Jan 17, 2023
1 parent 2a510f1 commit bc9312a
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 12 deletions.
4 changes: 4 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type Connector interface {
// CreateMailbox creates a mailbox with the given name.
CreateMailbox(ctx context.Context, name []string) (imap.Mailbox, error)

// GetMessageLiteral is intended to be used by Gluon when, for some reason, the local cached data no longer exists.
// Note: this can get called from different go routines.
GetMessageLiteral(ctx context.Context, id imap.MessageID) ([]byte, error)

// IsMailboxVisible can be used to hide mailboxes from connected clients.
IsMailboxVisible(ctx context.Context, mboxID imap.MailboxID) bool

Expand Down
4 changes: 4 additions & 0 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (conn *Dummy) DeleteMailbox(ctx context.Context, mboxID imap.MailboxID) err
return nil
}

func (conn *Dummy) GetMessageLiteral(ctx context.Context, id imap.MessageID) ([]byte, error) {
return conn.state.tryGetLiteral(id)
}

func (conn *Dummy) CreateMessage(ctx context.Context, mboxID imap.MailboxID, literal []byte, flags imap.FlagSet, date time.Time) (imap.Message, []byte, error) {
// NOTE: We are only recording this here since it was the easiest command to verify the data has been record properly
// in the context, as APPEND will always require a communication with the remote connector.
Expand Down
12 changes: 12 additions & 0 deletions connector/dummy_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ func (state *dummyState) getLiteral(messageID imap.MessageID) []byte {
return state.messages[messageID].literal
}

func (state *dummyState) tryGetLiteral(messageID imap.MessageID) ([]byte, error) {
state.lock.Lock()
defer state.lock.Unlock()

v, ok := state.messages[messageID]
if !ok {
return nil, ErrNoSuchMessage
}

return v.literal, nil
}

func (state *dummyState) createMessage(
mboxID imap.MailboxID,
literal []byte,
Expand Down
6 changes: 6 additions & 0 deletions internal/backend/state_connector_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (sc *stateConnectorImpl) CreateMessage(
return imap.NewInternalMessageID(), msg, newLiteral, nil
}

func (sc *stateConnectorImpl) GetMessageLiteral(ctx context.Context, id imap.MessageID) ([]byte, error) {
ctx = sc.newContextWithMetadata(ctx)

return sc.connector.GetMessageLiteral(ctx, id)
}

func (sc *stateConnectorImpl) AddMessagesToMailbox(
ctx context.Context,
messageIDs []imap.MessageID,
Expand Down
2 changes: 1 addition & 1 deletion internal/backend/state_user_interface_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *StateUserInterfaceImpl) GetRemote() state.Connector {
return s.c
}

func (s *StateUserInterfaceImpl) GetStore() store.Store {
func (s *StateUserInterfaceImpl) GetStore() *store.WriteControlledStore {
return s.u.store
}

Expand Down
6 changes: 3 additions & 3 deletions internal/backend/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type user struct {

connector connector.Connector
updateInjector *updateInjector
store store.Store
store *store.WriteControlledStore
delimiter string

db *db.DB
Expand All @@ -49,7 +49,7 @@ func newUser(
userID string,
database *db.DB,
conn connector.Connector,
store store.Store,
st store.Store,
delimiter string,
imapLimits limits.IMAP,
) (*user, error) {
Expand Down Expand Up @@ -79,7 +79,7 @@ func newUser(

connector: conn,
updateInjector: newUpdateInjector(conn, userID),
store: store,
store: store.NewWriteControlledStore(st),
delimiter: delimiter,

db: database,
Expand Down
6 changes: 3 additions & 3 deletions internal/state/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (state *State) actionCreateMessage(
return 0, fmt.Errorf("failed to set internal ID: %w", err)
}

if err := state.user.GetStore().Set(internalID, literalWithHeader); err != nil {
if err := state.user.GetStore().SetUnchecked(internalID, literalWithHeader); err != nil {
return 0, fmt.Errorf("failed to store message literal: %w", err)
}

Expand Down Expand Up @@ -196,7 +196,7 @@ func (state *State) actionCreateRecoveredMessage(
return err
}

if err := state.user.GetStore().Set(internalID, literal); err != nil {
if err := state.user.GetStore().SetUnchecked(internalID, literal); err != nil {
return fmt.Errorf("failed to store message literal: %w", err)
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func (state *State) actionImportRecoveredMessage(
return ids.MessageIDPair{}, false, fmt.Errorf("failed to set internal ID: %w", err)
}

if err := state.user.GetStore().Set(internalID, literalWithHeader); err != nil {
if err := state.user.GetStore().SetUnchecked(internalID, literalWithHeader); err != nil {
return ids.MessageIDPair{}, false, fmt.Errorf("failed to store message literal: %w", err)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/state/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Connector interface {
date time.Time,
) (imap.InternalMessageID, imap.Message, []byte, error)

// GetMessageLiteral retrieves the message literal from the connector.
// Note: this can get called from different go routines.
GetMessageLiteral(ctx context.Context, id imap.MessageID) ([]byte, error)

// AddMessagesToMailbox adds the message with the given ID to the mailbox with the given ID.
AddMessagesToMailbox(
ctx context.Context,
Expand Down
2 changes: 1 addition & 1 deletion internal/state/mailbox_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *Mailbox) Fetch(ctx context.Context, seq *proto.SequenceSet, attributes
var literal []byte

if needsLiteral {
l, err := m.state.getLiteral(msg.ID.InternalID)
l, err := m.state.getLiteral(ctx, msg.ID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/state/mailbox_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func buildSearchData(ctx context.Context, m *Mailbox, op *buildSearchOpResult, m
}

if op.needsLiteral {
l, err := m.state.getLiteral(message.ID.InternalID)
l, err := m.state.getLiteral(ctx, message.ID)
if err != nil {
return searchData{}, err
}
Expand Down
28 changes: 26 additions & 2 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,32 @@ func (state *State) endIdle() {
state.idleCh = nil
}

func (state *State) getLiteral(messageID imap.InternalMessageID) ([]byte, error) {
return state.user.GetStore().Get(messageID)
func (state *State) getLiteral(ctx context.Context, messageID ids.MessageIDPair) ([]byte, error) {
var literal []byte

storeLiteral, firstErr := state.user.GetStore().Get(messageID.InternalID)
if firstErr != nil {
logrus.Debugf("Failed load %v from store, attempting to download from connector", messageID.InternalID.ShortID())

connectorLiteral, err := state.user.GetRemote().GetMessageLiteral(ctx, messageID.RemoteID)
if err != nil {
logrus.Errorf("Failed to download message from connector: %v", err)
return nil, fmt.Errorf("message failed to load from cache (%v), failed to download from connector: %w", firstErr, err)
}

if err := state.user.GetStore().Set(messageID.InternalID, connectorLiteral); err != nil {
logrus.Errorf("Failed to store download message from connector: %v", err)
return nil, fmt.Errorf("message failed to load from cache (%v), failed to store new downloaded message: %w", firstErr, err)
}

logrus.Debugf("Message %v downloaded and stored ", messageID.InternalID.ShortID())

literal = connectorLiteral
} else {
literal = storeLiteral
}

return literal, nil
}

func (state *State) flushResponses(ctx context.Context, permitExpunge bool) ([]response.Response, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/state/user_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type UserInterface interface {

GetRemote() Connector

GetStore() store.Store
GetStore() *store.WriteControlledStore

QueueOrApplyStateUpdate(ctx context.Context, tx *ent.Tx, update ...Update) error

Expand Down
4 changes: 4 additions & 0 deletions store/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (c *onDiskStore) Set(messageID imap.InternalMessageID, b []byte) error {
b = enc
}

if err := os.MkdirAll(c.path, 0o700); err != nil {
return err
}

return os.WriteFile(
filepath.Join(c.path, messageID.String()),
c.gcm.Seal(nonce, nonce, b, nil),
Expand Down
140 changes: 140 additions & 0 deletions store/write_controlled_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package store

import (
"github.com/ProtonMail/gluon/imap"
"sync"
"sync/atomic"
)

type syncRef struct {
lock sync.RWMutex
counter int32
}

// WriteControlledStore ensures that a given file on disk can safely be accessed by multiple readers and only
// one writer. Internally we maintain a list of RWLocks per message ID.
type WriteControlledStore struct {
impl Store

lock sync.Mutex
entryTable map[imap.InternalMessageID]*syncRef
lockPool []*syncRef
}

func NewWriteControlledStore(impl Store) *WriteControlledStore {
return &WriteControlledStore{
impl: impl,
entryTable: make(map[imap.InternalMessageID]*syncRef),
}
}

func (w *WriteControlledStore) acquireSyncRef(id imap.InternalMessageID) *syncRef {
w.lock.Lock()
defer w.lock.Unlock()

v, ok := w.entryTable[id]
if !ok {
var s *syncRef

if len(w.lockPool) != 0 {
s = w.lockPool[0]
s.counter = 1
w.lockPool = w.lockPool[1:]
} else {
s = &syncRef{counter: 1}
}

w.entryTable[id] = s

return s
}

atomic.AddInt32(&v.counter, 1)

return v
}

func (w *WriteControlledStore) releaseSyncRef(id imap.InternalMessageID, ref *syncRef) {
if atomic.AddInt32(&ref.counter, -1) <= 0 {
w.lock.Lock()
defer w.lock.Unlock()

if atomic.LoadInt32(&ref.counter) <= 0 {
delete(w.entryTable, id)
w.lockPool = append(w.lockPool, ref)
}
}
}

func (w *WriteControlledStore) Get(messageID imap.InternalMessageID) ([]byte, error) {
syncRef := w.acquireSyncRef(messageID)
defer w.releaseSyncRef(messageID, syncRef)

syncRef.lock.RLock()
defer syncRef.lock.RUnlock()

return w.impl.Get(messageID)
}

func (w *WriteControlledStore) Set(messageID imap.InternalMessageID, literal []byte) error {
syncRef := w.acquireSyncRef(messageID)
defer w.releaseSyncRef(messageID, syncRef)

syncRef.lock.Lock()
defer syncRef.lock.Unlock()

return w.impl.Set(messageID, literal)
}

// SetUnchecked allows the user to bypass lock access. This will only work if you can guarantee that the data being
// set does not previously exit (e.g: New message).
func (w *WriteControlledStore) SetUnchecked(messageID imap.InternalMessageID, literal []byte) error {
return w.impl.Set(messageID, literal)
}

func (w *WriteControlledStore) Delete(messageID ...imap.InternalMessageID) error {
for _, id := range messageID {
if err := func() error {
syncRef := w.acquireSyncRef(id)
defer w.releaseSyncRef(id, syncRef)

syncRef.lock.Lock()
defer syncRef.lock.Unlock()

return w.impl.Delete(messageID...)
}(); err != nil {
return err
}
}

return nil
}

func (w *WriteControlledStore) Close() error {
return w.impl.Close()
}

func (w *WriteControlledStore) List() ([]imap.InternalMessageID, error) {
return w.impl.List()
}

type WriteControlledStoreBuilder struct {
builder Builder
}

func NewWriteControlledStoreBuilder(builder Builder) *WriteControlledStoreBuilder {
return &WriteControlledStoreBuilder{builder: builder}
}

func (w *WriteControlledStoreBuilder) New(dir, userID string, passphrase []byte) (Store, error) {
impl, err := w.builder.New(dir, userID, passphrase)
if err != nil {
return nil, err
}

return NewWriteControlledStore(impl), nil
}

func (w *WriteControlledStoreBuilder) Delete(dir, userID string) error {
return w.builder.Delete(dir, userID)
}
Loading

0 comments on commit bc9312a

Please sign in to comment.