diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 927ffb9c93..981cb7714a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -41,7 +41,7 @@ type MissingMessageVerifier struct { storenodeRequestor common.StorenodeRequestor messageTracker MessageTracker - criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages + criteriaInterest map[string]*criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterestMu sync.RWMutex C chan *protocol.Envelope @@ -66,7 +66,7 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes messageTracker: messageTracker, logger: logger.Named("missing-msg-verifier"), params: params, - criteriaInterest: make(map[string]criteriaInterest), + criteriaInterest: make(map[string]*criteriaInterest), C: make(chan *protocol.Envelope, 1000), } } @@ -99,7 +99,7 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt currMessageVerificationRequest.cancel() } - m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest + m.criteriaInterest[contentFilter.PubsubTopic] = &criteriaInterest } func (m *MissingMessageVerifier) setRunning(running bool) { @@ -121,6 +121,14 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.ctx = ctx m.cancel = cancelFunc + m.criteriaInterestMu.Lock() + for _, value := range m.criteriaInterest { + ctx, cancel := context.WithCancel(m.ctx) + value.ctx = ctx + value.cancel = cancel + } + m.criteriaInterestMu.Unlock() + go func() { defer utils.LogOnPanic() t := time.NewTicker(m.params.interval) @@ -130,11 +138,11 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { for { select { case <-t.C: - m.logger.Debug("checking for missing messages...") + m.logger.Debug("checking for missing messages...", zap.Int("criteria-len", len(m.criteriaInterest))) m.criteriaInterestMu.RLock() critIntList := make([]criteriaInterest, 0, len(m.criteriaInterest)) for _, value := range m.criteriaInterest { - critIntList = append(critIntList, value) + critIntList = append(critIntList, *value) } m.criteriaInterestMu.RUnlock() for _, interest := range critIntList { @@ -146,7 +154,9 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { semaphore <- struct{}{} go func(interest criteriaInterest) { defer utils.LogOnPanic() + m.logger.Debug("fetching history ", zap.Stringer("content-filter", interest.contentFilter), zap.Time("last-checked", interest.lastChecked)) m.fetchHistory(m.C, interest) + m.logger.Debug("finished fetching history ", zap.Stringer("content-filter", interest.contentFilter), zap.Time("last-checked", interest.lastChecked)) <-semaphore }(interest) }