From 0b1bd5fd572ed957a724c82b6752f46ad1440568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 2 Oct 2018 12:51:17 +0200 Subject: [PATCH] coreapi pubsub: better ctx for connectToPubSubPeers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/coreapi/pubsub.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/coreapi/pubsub.go b/core/coreapi/pubsub.go index f24aba69b32..fd07b0e64b4 100644 --- a/core/coreapi/pubsub.go +++ b/core/coreapi/pubsub.go @@ -20,6 +20,7 @@ import ( type PubSubAPI CoreAPI type pubSubSubscription struct { + cancel context.CancelFunc subscription *floodsub.Subscription } @@ -75,19 +76,21 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt return nil, err } + pubctx, cancel := context.WithCancel(api.node.Context()) + if options.Discover { go func() { - blk, err := api.core().Block().Put(ctx, strings.NewReader("floodsub:"+topic)) + blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic)) if err != nil { log.Error("pubsub discovery: ", err) return } - connectToPubSubPeers(ctx, api.node, blk.Path().Cid()) + connectToPubSubPeers(pubctx, api.node, blk.Path().Cid()) }() } - return &pubSubSubscription{sub}, nil + return &pubSubSubscription{cancel, sub}, nil } func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) { @@ -95,7 +98,7 @@ func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) { defer cancel() provs := n.Routing.FindProvidersAsync(ctx, cid, 10) - wg := &sync.WaitGroup{} + var wg sync.WaitGroup for p := range provs { wg.Add(1) go func(pi pstore.PeerInfo) { @@ -127,6 +130,7 @@ func (api *PubSubAPI) checkNode() error { } func (sub *pubSubSubscription) Close() error { + sub.cancel() sub.subscription.Cancel() return nil }