Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(discovery)!: discover peers by tag #2730

Merged
merged 8 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nodebuilder/share/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
PeerManagerParams peers.Parameters

LightAvailability light.Parameters `toml:",omitempty"`
Discovery discovery.Parameters
Discovery *discovery.Parameters
}

func DefaultConfig(tp node.Type) Config {
Expand Down
17 changes: 13 additions & 4 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@ import (
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)

func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
const (
// fullNodesTag is the tag used to identify full nodes in the discovery service.
fullNodesTag = "full"
)

func newDiscovery(cfg *disc.Parameters,
) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) {
return func(
r routing.ContentRouting,
h host.Host,
) *disc.Discovery {
manager *peers.Manager,
) (*disc.Discovery, error) {
return disc.NewDiscovery(
cfg,
h,
routingdisc.NewRoutingDiscovery(r),
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
disc.WithPeersLimit(cfg.Discovery.PeersLimit),
disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval),
fullNodesTag,
disc.WithOnPeersUpdate(manager.UpdateFullNodePool),
)
}
}
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(newModule),
fx.Invoke(func(disc *disc.Discovery) {}),
fx.Provide(fx.Annotate(
newDiscovery(*cfg),
newDiscovery(cfg.Discovery),
fx.OnStart(func(ctx context.Context, d *disc.Discovery) error {
return d.Start(ctx)
}),
Expand Down Expand Up @@ -147,7 +147,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(
func(
params peers.Parameters,
discovery *disc.Discovery,
host host.Host,
connGater *conngater.BasicConnectionGater,
shrexSub *shrexsub.PubSub,
Expand All @@ -158,7 +157,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
) (*peers.Manager, error) {
return peers.NewManager(
params,
discovery,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
Expand Down
2 changes: 0 additions & 2 deletions nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
sw.Disconnect(t, nodes[0], nodes[1])

// create and start one more FN with disabled discovery
fullCfg.Share.Discovery.PeersLimit = 0
disabledDiscoveryFN := sw.NewNodeWithConfig(node.Full, fullCfg, nodesConfig)
err = disabledDiscoveryFN.Start(ctx)
require.NoError(t, err)

// ensure that the FN with disabled discovery is discovered by both of the
Expand Down
10 changes: 7 additions & 3 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
}

func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(
params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10
disc, err := discovery.NewDiscovery(
params,
nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithAdvertiseInterval(time.Second),
discovery.WithPeersLimit(10),
"full",
)
require.NoError(t, err)
store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore())
require.NoError(t, err)
err = store.Start(context.Background())
Expand Down
9 changes: 0 additions & 9 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/host"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
Expand All @@ -25,7 +23,6 @@ import (
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
Expand Down Expand Up @@ -210,18 +207,12 @@ func testManager(
return nil, err
}

disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithPeersLimit(10),
discovery.WithAdvertiseInterval(time.Second),
)
connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore()))
if err != nil {
return nil, err
}
manager, err := peers.NewManager(
peers.DefaultParameters(),
disc,
host,
connGater,
peers.WithShrexSubPools(shrexSub, headerSub),
Expand Down
64 changes: 28 additions & 36 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (
var log = logging.Logger("share/discovery")

const (
// rendezvousPoint is the namespace where peers advertise and discover each other.
rendezvousPoint = "full"

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p. We specify a larger buffer size for the channel
// to avoid overflowing and blocking subscription during disconnection bursts.
Expand All @@ -45,6 +42,8 @@ var discoveryRetryTimeout = retryTimeout
// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
// Tag is used as rondezvous point for discovery service
tag string
set *limitedSet
host host.Host
disc discovery.Discovery
Expand All @@ -58,43 +57,50 @@ type Discovery struct {

cancel context.CancelFunc

params Parameters
params *Parameters
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)

func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers {
return func(peerID peer.ID, isAdded bool) {
f(peerID, isAdded)
next(peerID, isAdded)
}
}

// NewDiscovery constructs a new discovery.
func NewDiscovery(
params *Parameters,
h host.Host,
d discovery.Discovery,
tag string,
opts ...Option,
) *Discovery {
params := DefaultParameters()

for _, opt := range opts {
opt(&params)
) (*Discovery, error) {
if err := params.Validate(); err != nil {
return nil, err
}

if tag == "" {
return nil, fmt.Errorf("discovery: tag cannot be empty")
}
o := newOptions(opts...)
return &Discovery{
tag: tag,
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: func(peer.ID, bool) {},
onUpdatedPeers: o.onUpdatedPeers,
params: params,
triggerDisc: make(chan struct{}),
}
}, nil
}

func (d *Discovery) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

if d.params.PeersLimit == 0 {
log.Warn("peers limit is set to 0. Skipping discovery...")
return nil
}

sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
return fmt.Errorf("subscribing for connection events: %w", err)
Expand All @@ -111,15 +117,6 @@ func (d *Discovery) Stop(context.Context) error {
return nil
}

// WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list.
func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {
prev := d.onUpdatedPeers
d.onUpdatedPeers = func(peerID peer.ID, isAdded bool) {
prev(peerID, isAdded)
f(peerID, isAdded)
}
}

// Peers provides a list of discovered peers in the "full" topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
Expand All @@ -133,7 +130,7 @@ func (d *Discovery) Discard(id peer.ID) bool {
return false
}

d.host.ConnManager().Unprotect(id, rendezvousPoint)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
d.host.ConnManager().Unprotect(id, d.tag)
d.connector.Backoff(id)
d.set.Remove(id)
d.onUpdatedPeers(id, false)
Expand All @@ -153,21 +150,16 @@ func (d *Discovery) Discard(id peer.ID) bool {
// Advertise is a utility function that persistently advertises a service through an Advertiser.
// TODO: Start advertising only after the reachability is confirmed by AutoNAT
func (d *Discovery) Advertise(ctx context.Context) {
if d.params.AdvertiseInterval == -1 {
log.Warn("AdvertiseInterval is set to -1. Skipping advertising...")
return
}

timer := time.NewTimer(d.params.AdvertiseInterval)
defer timer.Stop()
for {
_, err := d.disc.Advertise(ctx, rendezvousPoint)
_, err := d.disc.Advertise(ctx, d.tag)
d.metrics.observeAdvertise(ctx, err)
if err != nil {
if ctx.Err() != nil {
return
}
log.Warnw("error advertising", "rendezvous", rendezvousPoint, "err", err)
log.Warnw("error advertising", "rendezvous", d.tag, "err", err)

// we don't want retry indefinitely in busy loop
// internal discovery mechanism may need some time before attempts
Expand Down Expand Up @@ -280,7 +272,7 @@ func (d *Discovery) discover(ctx context.Context) bool {
findCancel()
}()

peers, err := d.disc.FindPeers(findCtx, rendezvousPoint)
peers, err := d.disc.FindPeers(findCtx, d.tag)
if err != nil {
log.Error("unable to start discovery", "err", err)
return false
Expand Down Expand Up @@ -371,11 +363,11 @@ func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo
d.metrics.observeHandlePeer(ctx, handlePeerConnected)
logger.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
// Tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
// In the future, we should design a protocol that keeps bidirectional agreement on whether
// connection should be kept or not, similar to mesh link in GossipSub.
d.host.ConnManager().Protect(peer.ID, rendezvousPoint)
d.host.ConnManager().Protect(peer.ID, d.tag)
return true
}

Expand Down
Loading
Loading