diff --git a/protocol/common/code_control_flags.go b/protocol/common/code_control_flags.go index 4a99a59f36f..d726473c5a0 100644 --- a/protocol/common/code_control_flags.go +++ b/protocol/common/code_control_flags.go @@ -8,4 +8,6 @@ type CodeControlFlags struct { // CuratedCommunitiesUpdateLoopEnabled indicates whether we should disable the curated communities update loop. // Usually should be disabled in tests. CuratedCommunitiesUpdateLoopEnabled bool + // Store node re-fetching interval in seconds + StoreNodeFetchInterval int } diff --git a/protocol/messenger.go b/protocol/messenger.go index edcb397ec2d..bc3db80eac0 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -158,6 +158,8 @@ type Messenger struct { once sync.Once } + storeSchedulerCh chan struct{} + connectionState connection.State telemetryClient *telemetry.Client contractMaker *contracts.ContractMaker @@ -557,6 +559,7 @@ func NewMessenger( communityStorenodes: storenodes.NewCommunityStorenodes(storenodes.NewDB(database), logger), account: c.account, quit: make(chan struct{}), + storeSchedulerCh: make(chan struct{}, 10), ctx: ctx, cancel: cancel, importingCommunities: make(map[string]bool), @@ -790,6 +793,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { m.startSyncSettingsLoop() m.startSettingsChangesLoop() m.startCommunityRekeyLoop() + m.startStoreNodeFetchLoop() if m.config.codeControlFlags.CuratedCommunitiesUpdateLoopEnabled { m.startCuratedCommunitiesUpdateLoop() } @@ -925,7 +929,7 @@ func (m *Messenger) handleConnectionChange(online bool) { // Start fetching messages from store nodes if online && m.config.codeControlFlags.AutoRequestHistoricMessages { - m.asyncRequestAllHistoricMessages() + m.storeSchedulerCh <- struct{}{} } // Update ENS verifier @@ -1886,6 +1890,7 @@ func (m *Messenger) Shutdown() (err error) { close(m.quit) m.cancel() + close(m.storeSchedulerCh) m.shutdownWaitGroup.Wait() for i, task := range m.shutdownTasks { m.logger.Debug("running shutdown task", zap.Int("n", i)) diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index 6763daeb6ae..c7db977b928 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -129,6 +129,7 @@ func messengerDefaultConfig() config { c.codeControlFlags.AutoRequestHistoricMessages = true c.codeControlFlags.CuratedCommunitiesUpdateLoopEnabled = true + c.codeControlFlags.StoreNodeFetchInterval = 20 return c } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index eab644f5a42..21197df99e0 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -55,6 +55,31 @@ func (m *Messenger) shouldSync() (bool, error) { return useMailserver, nil } +func (m *Messenger) startStoreNodeFetchLoop() { + // Initial fetch + m.storeSchedulerCh <- struct{}{} + go func() { + for { + select { + case <-m.storeSchedulerCh: + // We ignore the return filter, as + // data is going to be set implicitly by syncFilters func + _, err := m.RequestAllHistoricMessages(false, true) + if err != nil { + m.logger.Error("failed to request historic messages", zap.Error(err)) + } else { + go func() { + // Schedule next fetch in StoreNodeFetchInterval seconds + time.Sleep(time.Duration(m.config.codeControlFlags.StoreNodeFetchInterval) * time.Second) + m.storeSchedulerCh <- struct{}{} + }() + } + case <-m.quit: + return + } + } + }() +} func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) { shouldSync, err := m.shouldSync() if err != nil { @@ -376,6 +401,7 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries allResponses.AddChats(response.Chats()) allResponses.AddMessages(response.Messages()) } + return allResponses, nil } @@ -1034,12 +1060,15 @@ func (m *Messenger) RemoveFilters(filters []*transport.Filter) error { } func (m *Messenger) ConnectionChanged(state connection.State) { + m.logger.Info("### ConnectionChanged", zap.Any("state", state)) m.transport.ConnectionChanged(state) if !m.connectionState.Offline && state.Offline { + m.logger.Info("### ConnectionChanged stop") m.sender.StopDatasync() } if m.connectionState.Offline && !state.Offline { + m.logger.Info("### ConnectionChanged start") err := m.sender.StartDatasync(m.sendDataSync) if err != nil { m.logger.Error("failed to start datasync", zap.Error(err)) diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index f3423e4384e..35554801429 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -429,14 +429,7 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error { // Query mailserver if m.config.codeControlFlags.AutoRequestHistoricMessages { - go func() { - _, err := m.performMailserverRequest(&ms, func(_ mailservers.Mailserver) (*MessengerResponse, error) { - return m.RequestAllHistoricMessages(false, false) - }) - if err != nil { - m.logger.Error("could not perform mailserver request", zap.Error(err)) - } - }() + m.storeSchedulerCh <- struct{}{} } } } @@ -582,12 +575,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e } // Query mailserver if m.config.codeControlFlags.AutoRequestHistoricMessages { - go func() { - _, err := m.RequestAllHistoricMessages(false, true) - if err != nil { - m.logger.Error("failed to request historic messages", zap.Error(err)) - } - }() + m.storeSchedulerCh <- struct{}{} } } else { m.mailPeersMutex.Unlock() @@ -629,16 +617,6 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e return nil } -func (m *Messenger) asyncRequestAllHistoricMessages() { - m.logger.Debug("asyncRequestAllHistoricMessages") - go func() { - _, err := m.RequestAllHistoricMessages(false, true) - if err != nil { - m.logger.Error("failed to request historic messages", zap.Error(err)) - } - }() -} - func (m *Messenger) updateWakuV1PeerStatus() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop()