Skip to content

Commit

Permalink
fix_: scheduler store node fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
vitvly committed Jun 5, 2024
1 parent dfdc165 commit eb89ca1
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 25 deletions.
2 changes: 2 additions & 0 deletions protocol/common/code_control_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ type CodeControlFlags struct {
// CuratedCommunitiesUpdateLoopEnabled indicates whether we should disable the curated communities update loop.
// Usually should be disabled in tests.
CuratedCommunitiesUpdateLoopEnabled bool
// Store node re-fetching interval in seconds
StoreNodeFetchInterval int
}
7 changes: 6 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ type Messenger struct {
once sync.Once
}

storeSchedulerCh chan struct{}

connectionState connection.State
telemetryClient *telemetry.Client
contractMaker *contracts.ContractMaker
Expand Down Expand Up @@ -557,6 +559,7 @@ func NewMessenger(
communityStorenodes: storenodes.NewCommunityStorenodes(storenodes.NewDB(database), logger),
account: c.account,
quit: make(chan struct{}),
storeSchedulerCh: make(chan struct{}, 10),
ctx: ctx,
cancel: cancel,
importingCommunities: make(map[string]bool),
Expand Down Expand Up @@ -790,6 +793,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
m.startSyncSettingsLoop()
m.startSettingsChangesLoop()
m.startCommunityRekeyLoop()
m.startStoreNodeFetchLoop()
if m.config.codeControlFlags.CuratedCommunitiesUpdateLoopEnabled {
m.startCuratedCommunitiesUpdateLoop()
}
Expand Down Expand Up @@ -925,7 +929,7 @@ func (m *Messenger) handleConnectionChange(online bool) {

// Start fetching messages from store nodes
if online && m.config.codeControlFlags.AutoRequestHistoricMessages {
m.asyncRequestAllHistoricMessages()
m.storeSchedulerCh <- struct{}{}
}

// Update ENS verifier
Expand Down Expand Up @@ -1886,6 +1890,7 @@ func (m *Messenger) Shutdown() (err error) {

close(m.quit)
m.cancel()
close(m.storeSchedulerCh)
m.shutdownWaitGroup.Wait()
for i, task := range m.shutdownTasks {
m.logger.Debug("running shutdown task", zap.Int("n", i))
Expand Down
1 change: 1 addition & 0 deletions protocol/messenger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func messengerDefaultConfig() config {

c.codeControlFlags.AutoRequestHistoricMessages = true
c.codeControlFlags.CuratedCommunitiesUpdateLoopEnabled = true
c.codeControlFlags.StoreNodeFetchInterval = 20
return c
}

Expand Down
29 changes: 29 additions & 0 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ func (m *Messenger) shouldSync() (bool, error) {
return useMailserver, nil
}

func (m *Messenger) startStoreNodeFetchLoop() {
// Initial fetch
m.storeSchedulerCh <- struct{}{}
go func() {
for {
select {
case <-m.storeSchedulerCh:
// We ignore the return filter, as
// data is going to be set implicitly by syncFilters func
_, err := m.RequestAllHistoricMessages(false, true)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
} else {
go func() {
// Schedule next fetch in StoreNodeFetchInterval seconds
time.Sleep(time.Duration(m.config.codeControlFlags.StoreNodeFetchInterval) * time.Second)
m.storeSchedulerCh <- struct{}{}
}()
}
case <-m.quit:
return
}
}
}()
}
func (m *Messenger) scheduleSyncChat(chat *Chat) (bool, error) {
shouldSync, err := m.shouldSync()
if err != nil {
Expand Down Expand Up @@ -376,6 +401,7 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries
allResponses.AddChats(response.Chats())
allResponses.AddMessages(response.Messages())
}

return allResponses, nil
}

Expand Down Expand Up @@ -1034,12 +1060,15 @@ func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
}

func (m *Messenger) ConnectionChanged(state connection.State) {
m.logger.Info("### ConnectionChanged", zap.Any("state", state))
m.transport.ConnectionChanged(state)
if !m.connectionState.Offline && state.Offline {
m.logger.Info("### ConnectionChanged stop")
m.sender.StopDatasync()
}

if m.connectionState.Offline && !state.Offline {
m.logger.Info("### ConnectionChanged start")
err := m.sender.StartDatasync(m.sendDataSync)
if err != nil {
m.logger.Error("failed to start datasync", zap.Error(err))
Expand Down
26 changes: 2 additions & 24 deletions protocol/messenger_mailserver_cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,7 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {

// Query mailserver
if m.config.codeControlFlags.AutoRequestHistoricMessages {
go func() {
_, err := m.performMailserverRequest(&ms, func(_ mailservers.Mailserver) (*MessengerResponse, error) {
return m.RequestAllHistoricMessages(false, false)
})
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
}
}()
m.storeSchedulerCh <- struct{}{}
}
}
}
Expand Down Expand Up @@ -582,12 +575,7 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
}
// Query mailserver
if m.config.codeControlFlags.AutoRequestHistoricMessages {
go func() {
_, err := m.RequestAllHistoricMessages(false, true)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
m.storeSchedulerCh <- struct{}{}
}
} else {
m.mailPeersMutex.Unlock()
Expand Down Expand Up @@ -629,16 +617,6 @@ func (m *Messenger) handleMailserverCycleEvent(connectedPeers []ConnectedPeer) e
return nil
}

func (m *Messenger) asyncRequestAllHistoricMessages() {
m.logger.Debug("asyncRequestAllHistoricMessages")
go func() {
_, err := m.RequestAllHistoricMessages(false, true)
if err != nil {
m.logger.Error("failed to request historic messages", zap.Error(err))
}
}()
}

func (m *Messenger) updateWakuV1PeerStatus() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand Down

0 comments on commit eb89ca1

Please sign in to comment.