diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 020bb23f5..92659b9c8 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -52,6 +52,7 @@ type Sub struct { type subscribeParameters struct { batchInterval time.Duration multiplexChannelBuffer int + preferredPeers peer.IDSlice } type SubscribeOptions func(*subscribeParameters) @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions { } } +func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions { + return func(params *subscribeParameters) { + params.preferredPeers = peers + } +} + // Subscribe func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) { sub := new(Sub) diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index a43c3c396..942ac6852 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -2,10 +2,12 @@ package filter import ( "context" + "math/rand" "sync" "time" "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -61,7 +63,8 @@ type EnevelopeProcessor interface { OnNewEnvelope(env *protocol.Envelope) error } -func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { +func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, + envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager { // This fn is being mocked in test mgr := new(FilterManager) mgr.ctx = ctx @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { defer utils.LogOnPanic() ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} + if len(mgr.params.preferredPeers) > 0 { + //use one peer which is from preferred peers. + randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1) + randomPreferredPeer := mgr.params.preferredPeers[randomIndex] + config.Peers = []peer.ID{randomPreferredPeer} + } sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) mgr.Lock() mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub} diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 8a5f2d408..e16f675dd 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() { s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) ctx, cancel := context.WithCancel(context.Background()) s.Log.Info("About to perform API Subscribe()") - params := subscribeParameters{300 * time.Second, 1024} + params := subscribeParameters{300 * time.Second, 1024, nil} apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, ¶ms) s.Require().NoError(err) s.Require().Equal(apiSub.ContentFilter, contentFilter)