Skip to content

Commit

Permalink
swarm/network/stream: package-wide subscriptionFunc
Browse files Browse the repository at this point in the history
  • Loading branch information
holisticode committed Jan 10, 2019
1 parent 689aad2 commit 89d07e1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
18 changes: 9 additions & 9 deletions swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ const (
RetrievalEnabled
)

// subscriptionFunc is used to determine what to do in order to perform subscriptions
// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
// (see TestRequestPeerSubscriptions in streamer_test.go)
var subscriptionFunc func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool = doRequestSubscription

// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
addr enode.ID
Expand All @@ -90,10 +95,6 @@ type Registry struct {
spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
// the subscriptionFunc is used to determine what to do in order to perform subscriptions
// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
// (see TestRequestPeerSubscriptions in streamer_test.go)
subscriptionFunc func(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool
}

// RegistryOptions holds optional values for NewRegistry constructor.
Expand Down Expand Up @@ -128,8 +129,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
maxPeerServers: options.MaxPeerServers,
balance: balance,
}
//assign the default subscription func: actually do request subscriptions from nodes
streamer.subscriptionFunc = streamer.doRequestSubscription

streamer.setupSpec()

streamer.api = NewAPI(streamer)
Expand Down Expand Up @@ -530,15 +530,15 @@ func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enod

for bin := startPo; bin <= endPo; bin++ {
//do the actual subscription
ok = r.subscriptionFunc(p, uint8(bin), subs)
ok = subscriptionFunc(r, p, uint8(bin), subs)
}
return ok
})
}

// doRequestSubscription sends the actual RequestSubscription to the peer
func (r *Registry) doRequestSubscription(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin)
// bin is always less then 256 and it is safe to convert it to type uint8
stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
if streams, ok := subs[p.ID()]; ok {
Expand Down
6 changes: 3 additions & 3 deletions swarm/network/stream/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,9 +1050,11 @@ func TestRequestPeerSubscriptions(t *testing.T) {

// simulate that we would do subscriptions: just store the bin numbers
fakeSubscriptions := make(map[string][]int)
//after the test, we need to reset the subscriptionFunc to the default
defer func() { subscriptionFunc = doRequestSubscription }()
// define the function which should run for each connection
// instead of doing real subscriptions, we just store the bin numbers
requestSubscriptionFunc := func(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
// get the peer ID
peerstr := fmt.Sprintf("%x", p.Over())
// create the array of bins per peer
Expand All @@ -1066,8 +1068,6 @@ func TestRequestPeerSubscriptions(t *testing.T) {
}
// create just a simple Registry object in order to be able to call...
r := &Registry{}
// ...the requestPeerSubscriptions function, which contains the logic for subscriptions
r.subscriptionFunc = requestSubscriptionFunc
r.requestPeerSubscriptions(k, nil)
// calculate the kademlia depth
kdepth := k.NeighbourhoodDepth()
Expand Down

0 comments on commit 89d07e1

Please sign in to comment.