Skip to content

Commit

Permalink
fix: Wait for remote update goroutine to finshes
Browse files Browse the repository at this point in the history
Wait until the goroutine repsonsible for updates from the remote
finishes to ensure that all references to the database are proplery
releases.

This was causing test cleanup on windows to fail since the go-routine
was still active when the test had finished executing.
  • Loading branch information
LBeernaertProton committed Jun 28, 2022
1 parent 6333a4e commit 253c01c
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions internal/backend/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type user struct {
states map[int]*State
statesLock sync.RWMutex
nextStateID int

updateStopCh chan struct{}
updateWG sync.WaitGroup
}

func newUser(ctx context.Context, userID string, client *ent.Client, remote *remote.User, store store.Store, delimiter string) (*user, error) {
Expand All @@ -32,27 +35,35 @@ func newUser(ctx context.Context, userID string, client *ent.Client, remote *rem
}

user := &user{
userID: userID,
remote: remote,
store: store,
delimiter: delimiter,
client: client,
states: make(map[int]*State),
userID: userID,
remote: remote,
store: store,
delimiter: delimiter,
client: client,
states: make(map[int]*State),
updateStopCh: make(chan struct{}),
}

if err := user.deleteAllMessagesMarkedDeleted(ctx); err != nil {
return nil, err
}

user.updateWG.Add(1)

go func() {
for update := range remote.GetUpdates() {
update := update

if err := user.tx(context.Background(), func(tx *ent.Tx) error {
defer update.Done()
return user.apply(context.Background(), tx, update)
}); err != nil {
logrus.WithError(err).Error("Failed to apply update")
defer user.updateWG.Done()

for {
select {
case update := <-remote.GetUpdates():
if err := user.tx(context.Background(), func(tx *ent.Tx) error {
defer update.Done()
return user.apply(context.Background(), tx, update)
}); err != nil {
logrus.WithError(err).Errorf("Failed to apply update: %v", update)
}
case <-user.updateStopCh:
return
}
}
}()
Expand Down Expand Up @@ -99,6 +110,10 @@ func (user *user) tx(ctx context.Context, fn func(tx *ent.Tx) error) error {
func (user *user) close(ctx context.Context) error {
user.closeStates()

// Wait until the connector update go routine has finished.
close(user.updateStopCh)
user.updateWG.Wait()

if err := user.remote.CloseAndSerializeOperationQueue(); err != nil {
return fmt.Errorf("failed to close user remote: %w", err)
}
Expand Down

0 comments on commit 253c01c

Please sign in to comment.