Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Server.RemoveUser #4

Merged
merged 1 commit into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@ type Connector interface {
// Authorize returns whether the given username/password combination are valid for this connector.
Authorize(username, password string) bool

// GetUpdates returns a stream of updates that the gluon server should apply.
GetUpdates() <-chan imap.Update

// Pause pauses the stream of updates.
Pause()

// Resume resumes the stream of updates.
Resume()

// ValidateCreate checks whether a mailbox with the given name can be created.
// If so, the flags, permanent flags and attributes which the mailbox would have are returned.
ValidateCreate(name []string) (flags, permFlags, attrs imap.FlagSet, err error)
Expand All @@ -43,12 +34,30 @@ type Connector interface {
// DeleteLabel deletes the label with the given ID.
DeleteLabel(ctx context.Context, mboxID string) error

// GetMessage returns the message with the given ID.
GetMessage(ctx context.Context, messageID string) (imap.Message, []string, error)

// CreateMessage creates a new message on the remote.
CreateMessage(ctx context.Context, mboxID string, literal []byte, flags imap.FlagSet, date time.Time) (imap.Message, error)

// LabelMessages labels the given messages with the given mailbox ID.
LabelMessages(ctx context.Context, messageIDs []string, mboxID string) error

// UnlabelMessages unlabels the given messages with the given mailbox ID.
UnlabelMessages(ctx context.Context, messageIDs []string, mboxID string) error

// MarkMessagesSeen sets the seen value of the given messages.
MarkMessagesSeen(ctx context.Context, messageIDs []string, seen bool) error

// MarkMessagesFlagged sets the flagged value of the given messages.
MarkMessagesFlagged(ctx context.Context, messageIDs []string, flagged bool) error

Close() error
// GetUpdates returns a stream of updates that the gluon server should apply.
GetUpdates() <-chan imap.Update

// Pause pauses the stream of updates.
Pause()

// Resume resumes the stream of updates.
Resume()
}
6 changes: 0 additions & 6 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,6 @@ func (conn *Dummy) MarkMessagesFlagged(ctx context.Context, messageIDs []string,
return nil
}

func (conn *Dummy) Close() error {
conn.ticker.Stop()

return nil
}

func (conn *Dummy) Sync(ctx context.Context) error {
for _, mailbox := range conn.state.getLabels() {
conn.updateCh <- imap.NewMailboxCreated(mailbox)
Expand Down
28 changes: 26 additions & 2 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ func (b *Backend) AddUser(conn connector.Connector, store store.Store, client *e
return userID, nil
}

func (b *Backend) RemoveUser(ctx context.Context, userID string) error {
b.usersLock.Lock()
defer b.usersLock.Unlock()

user, ok := b.users[userID]
if !ok {
return ErrNoSuchUser
}

if err := user.close(ctx); err != nil {
return fmt.Errorf("failed to close backend user: %w", err)
}

if err := b.remote.RemoveUser(ctx, userID); err != nil {
return fmt.Errorf("failed to remove remote user: %w", err)
}

delete(b.users, userID)

return nil
}

func (b *Backend) GetState(username, password string) (*State, error) {
b.usersLock.Lock()
defer b.usersLock.Unlock()
Expand All @@ -79,10 +101,12 @@ func (b *Backend) Close(ctx context.Context) error {
b.usersLock.Lock()
defer b.usersLock.Unlock()

for _, user := range b.users {
for userID, user := range b.users {
if err := user.close(ctx); err != nil {
return fmt.Errorf("failed to close backend user: %w", err)
return fmt.Errorf("failed to close backend user (%v): %w", userID, err)
}

delete(b.users, userID)
}

logrus.Debug("Backend was closed")
Expand Down
19 changes: 11 additions & 8 deletions internal/backend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package backend
import "errors"

var (
ErrNoSuchSnapshot = errors.New("no such snapshot")
ErrNoSuchMessage = errors.New("no such message")
ErrNoSuchMailbox = errors.New("no such mailbox")
ErrExistingMailbox = errors.New("a mailbox with that name already exists")
ErrAlreadySubscribed = errors.New("already subscribed to this mailbox")
ErrAlreadyUnsubscribed = errors.New("not subscribed to this mailbox")
ErrSessionIsNotSelected = errors.New("session is not selected")
ErrNotImplemented = errors.New("not implemented")
ErrNoSuchUser = errors.New("no such user")
ErrNoSuchSnapshot = errors.New("no such snapshot")
ErrNoSuchMessage = errors.New("no such message")
ErrNoSuchMailbox = errors.New("no such mailbox")

ErrExistingMailbox = errors.New("a mailbox with that name already exists")
ErrAlreadySubscribed = errors.New("already subscribed to this mailbox")
ErrAlreadyUnsubscribed = errors.New("not subscribed to this mailbox")
ErrSessionNotSelected = errors.New("session is not selected")

ErrNotImplemented = errors.New("not implemented")
)
2 changes: 1 addition & 1 deletion internal/backend/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (state *State) Mailbox(ctx context.Context, name string, fn func(*Mailbox)

func (state *State) Selected(ctx context.Context, fn func(*Mailbox) error) error {
if !state.IsSelected() {
return ErrSessionIsNotSelected
return ErrSessionNotSelected
}

return state.tx(ctx, func(tx *ent.Tx) error {
Expand Down
69 changes: 40 additions & 29 deletions internal/pchan/pchan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type PChan[T any] struct {
ready chan struct{}
done chan struct{}
lock sync.Mutex
wg sync.WaitGroup
}

type item[T any] struct {
Expand Down Expand Up @@ -46,6 +47,7 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} {
panic("channel is closed")

default:
// ...
}

var prio int
Expand All @@ -66,41 +68,61 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} {

go func() { ch.ready <- struct{}{} }()

return done
}

// Len returns the number of items queued.
func (ch *PChan[T]) Len() int {
ch.lock.Lock()
defer ch.lock.Unlock()
ch.wg.Add(1)

return len(ch.items)
return done
}

// Pop blocks until an item is available, then returns that item.
// If the channel is already closed, the call returns immediately and the bool value is false.
func (ch *PChan[T]) Pop() (T, bool) {
func (ch *PChan[T]) Pop() (t T, ok bool) {
select {
case <-ch.ready:
// ...

case <-ch.done:
// ...
}

return ch.pop()
ch.lock.Lock()
defer ch.lock.Unlock()

if len(ch.items) == 0 {
return t, false
}

var item *item[T]

item, ch.items = ch.items[0], ch.items[1:]

defer close(item.done)

ch.wg.Done()

return item.val, true
}

// Peek returns the highest priority item, if any.
// The bool is true if an item was available.
func (ch *PChan[T]) Peek() (T, bool) {
func (ch *PChan[T]) Peek() (t T, ok bool) {
ch.lock.Lock()
defer ch.lock.Unlock()

if len(ch.items) == 0 {
return *new(T), false
return t, false
}

return ch.items[0].val, true
}

// Len returns the number of items queued.
func (ch *PChan[T]) Len() int {
ch.lock.Lock()
defer ch.lock.Unlock()

return len(ch.items)
}

// Range repeatedly calls the callback with items as they are pushed onto the channel.
// It stops when the channel is closed.
func (ch *PChan[T]) Range(fn func(T)) {
Expand All @@ -125,6 +147,11 @@ func (ch *PChan[T]) Apply(fn func(T)) {
}
}

// Wait blocks until the queue is empty.
func (ch *PChan[T]) Wait() {
ch.wg.Wait()
}

// Close closes the channel, returning whatever was still queued on the channel.
func (ch *PChan[T]) Close() []T {
ch.lock.Lock()
Expand All @@ -135,6 +162,7 @@ func (ch *PChan[T]) Close() []T {
panic("channel is closed")

default:
// ...
}

for range ch.items {
Expand All @@ -159,23 +187,6 @@ func (ch *PChan[T]) String() string {
return res
}

func (ch *PChan[T]) pop() (T, bool) {
ch.lock.Lock()
defer ch.lock.Unlock()

if len(ch.items) == 0 {
return *new(T), false
}

var item *item[T]

item, ch.items = ch.items[0], ch.items[1:]

defer close(item.done)

return item.val, true
}

//nolint:gosec
func (ch *PChan[T]) getPosition(prio int) int {
lo := slices.IndexFunc(ch.items, func(item *item[T]) bool {
Expand Down
29 changes: 29 additions & 0 deletions internal/remote/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remote

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -51,6 +52,34 @@ func (m *Manager) AddUser(userID string, conn connector.Connector) (*User, error
return user, nil
}

// RemoveUser removes the user with the given ID from the remote manager.
// It waits until all the user's queued operations have been performed.
// TODO: Find a better way to flush the operation queue?
func (m *Manager) RemoveUser(ctx context.Context, userID string) error {
m.usersLock.Lock()
defer m.usersLock.Unlock()

user, ok := m.users[userID]
if !ok {
return ErrNoSuchUser
}

user.queue.Wait()
LBeernaertProton marked this conversation as resolved.
Show resolved Hide resolved

path, err := m.getQueuePath(userID)
if err != nil {
return err
}

if err := os.Remove(path); err != nil {
return err
}

delete(m.users, userID)

return nil
}

// GetUserID returns the user ID of the user with the given credentials.
func (m *Manager) GetUserID(username, password string) (string, error) {
m.usersLock.Lock()
Expand Down
11 changes: 4 additions & 7 deletions internal/remote/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remote

import (
"errors"
"fmt"
"io"
"os"
"sync"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (user *User) GetUpdates() <-chan imap.Update {
func (user *User) Close() error {
ops, err := user.closeQueue()
if err != nil {
return err
return fmt.Errorf("failed to close queue: %w", err)
}

if user.lastOp != nil {
Expand All @@ -79,15 +80,11 @@ func (user *User) Close() error {

b, err := saveOps(ops)
if err != nil {
return err
return fmt.Errorf("failed to serialize operations: %w", err)
}

if err := os.WriteFile(user.path, b, 0o600); err != nil {
return err
}

if err := user.conn.Close(); err != nil {
return err
return fmt.Errorf("failed to save operations: %w", err)
}

return nil
Expand Down
13 changes: 13 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ func (s *Server) AddUser(conn connector.Connector, store store.Store, driver, so
return userID, nil
}

// RemoveUser removes a user from the mailserver.
func (s *Server) RemoveUser(ctx context.Context, userID string) error {
if err := s.backend.RemoveUser(ctx, userID); err != nil {
return err
}

s.publish(events.EventUserRemoved{
UserID: userID,
})

return nil
}

// AddWatcher adds a new watcher.
func (s *Server) AddWatcher() chan events.Event {
s.watchersLock.Lock()
Expand Down
5 changes: 5 additions & 0 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func runServer(tb testing.TB, credentials map[string]string, delim string, tests
conn.Flush()
}

// Remove all users before shutdown.
for _, userID := range userIDs {
require.NoError(tb, server.RemoveUser(ctx, userID))
}

// Expect the server to shut down successfully when closed.
require.NoError(tb, server.Close(ctx))
require.NoError(tb, <-errCh)
Expand Down
Loading