diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 958dd8763b..d89a75f150 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -75,7 +75,7 @@ type BasicHost struct { network network.Network mux *msmux.MultistreamMuxer - ids *identify.IDService + ids identify.IDService hps *holepunch.Service pings *ping.PingService natmgr NATManager @@ -542,7 +542,7 @@ func (h *BasicHost) Mux() protocol.Switch { } // IDService returns -func (h *BasicHost) IDService() *identify.IDService { +func (h *BasicHost) IDService() identify.IDService { return h.ids } diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go index cbf5a7f778..fd0c445298 100644 --- a/p2p/protocol/holepunch/coordination.go +++ b/p2p/protocol/holepunch/coordination.go @@ -47,7 +47,7 @@ type Service struct { ctx context.Context ctxCancel context.CancelFunc - ids *identify.IDService + ids identify.IDService host host.Host tracer *tracer @@ -56,6 +56,8 @@ type Service struct { closed bool refCount sync.WaitGroup + hasPublicAddrsChan chan struct{} // this chan is closed as soon as we have a public address + // active hole punches for deduplicating activeMx sync.Mutex active map[peer.ID]struct{} @@ -64,18 +66,19 @@ type Service struct { type Option func(*Service) error // NewService creates a new service that can be used for hole punching -func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service, error) { +func NewService(h host.Host, ids identify.IDService, opts ...Option) (*Service, error) { if ids == nil { return nil, errors.New("identify service can't be nil") } ctx, cancel := context.WithCancel(context.Background()) hs := &Service{ - ctx: ctx, - ctxCancel: cancel, - host: h, - ids: ids, - active: make(map[peer.ID]struct{}), + ctx: ctx, + ctxCancel: cancel, + host: h, + ids: ids, + active: make(map[peer.ID]struct{}), + hasPublicAddrsChan: make(chan struct{}), } for _, opt := range opts { @@ -85,11 +88,47 @@ func NewService(h host.Host, ids *identify.IDService, opts ...Option) (*Service, } } - h.SetStreamHandler(Protocol, hs.handleNewStream) + hs.refCount.Add(1) + go hs.watchForPublicAddr() + h.Network().Notify((*netNotifiee)(hs)) return hs, nil } +func (hs *Service) watchForPublicAddr() { + defer hs.refCount.Done() + + log.Debug("waiting until we have at least one public address", "peer", hs.host.ID()) + + // TODO: We should have an event here that fires when identify discovers a new + // address (and when autonat confirms that address). + // As we currently don't have an event like this, just check our observed addresses + // regularly (exponential backoff starting at 250 ms, capped at 5s). + duration := 250 * time.Millisecond + const maxDuration = 5 * time.Second + t := time.NewTimer(duration) + defer t.Stop() + for { + if containsPublicAddr(hs.ids.OwnObservedAddrs()) { + log.Debug("Host now has a public address. Starting holepunch protocol.") + hs.host.SetStreamHandler(Protocol, hs.handleNewStream) + close(hs.hasPublicAddrsChan) + return + } + + select { + case <-hs.ctx.Done(): + return + case <-t.C: + duration *= 2 + if duration > maxDuration { + duration = maxDuration + } + t.Reset(duration) + } + } +} + // Close closes the Hole Punch Service. func (hs *Service) Close() error { hs.closeMx.Lock() @@ -119,7 +158,7 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, // send a CONNECT and start RTT measurement. msg := &pb.HolePunch{ Type: pb.HolePunch_CONNECT.Enum(), - ObsAddrs: addrsToBytes(hs.ids.OwnObservedAddrs()), + ObsAddrs: addrsToBytes(removeRelayAddrs(hs.ids.OwnObservedAddrs())), } start := time.Now() @@ -141,8 +180,10 @@ func (hs *Service) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, time.Duration, str.Reset() return nil, 0, fmt.Errorf("expect CONNECT message, got %s", t) } - - addrs := addrsFromBytes(msg.ObsAddrs) + addrs := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs)) + if len(addrs) == 0 { + str.Reset() + } msg.Reset() msg.Type = pb.HolePunch_SYNC.Enum() @@ -174,7 +215,6 @@ func (hs *Service) beginDirectConnect(p peer.ID) error { // It first attempts a direct dial (if we have a public address of that peer), and then // coordinates a hole punch over the given relay connection. func (hs *Service) DirectConnect(p peer.ID) error { - log.Debugw("got inbound proxy conn", "peer", p) if err := hs.beginDirectConnect(p); err != nil { return err } @@ -219,6 +259,18 @@ func (hs *Service) directConnect(rp peer.ID) error { } } + log.Debugw("got inbound proxy conn", "peer", rp) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + select { + case <-hs.ctx.Done(): + return hs.ctx.Err() + case <-ctx.Done(): + log.Debug("didn't find any public host address") + return errors.New("can't initiate hole punch, as we don't have any public addresses") + case <-hs.hasPublicAddrsChan: + } + // hole punch for i := 0; i < maxRetries; i++ { addrs, rtt, err := hs.initiateHolePunch(rp) @@ -260,6 +312,11 @@ func (hs *Service) incomingHolePunch(s network.Stream) (rtt time.Duration, addrs if !isRelayAddress(s.Conn().RemoteMultiaddr()) { return 0, nil, fmt.Errorf("received hole punch stream: %s", s.Conn().RemoteMultiaddr()) } + ownAddrs := removeRelayAddrs(hs.ids.OwnObservedAddrs()) + // If we can't tell the peer where to dial us, there's no point in starting the hole punching. + if len(ownAddrs) == 0 { + return 0, nil, errors.New("rejecting hole punch request, as we don't have any public addresses") + } s.SetDeadline(time.Now().Add(StreamTimeout)) wr := protoio.NewDelimitedWriter(s) @@ -273,13 +330,16 @@ func (hs *Service) incomingHolePunch(s network.Stream) (rtt time.Duration, addrs if t := msg.GetType(); t != pb.HolePunch_CONNECT { return 0, nil, fmt.Errorf("expected CONNECT message from initiator but got %d", t) } - obsDial := addrsFromBytes(msg.ObsAddrs) + obsDial := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs)) log.Debugw("received hole punch request", "peer", s.Conn().RemotePeer(), "addrs", obsDial) + if len(obsDial) == 0 { + return 0, nil, errors.New("expected CONNECT message to contain at least one address") + } // Write CONNECT message msg.Reset() msg.Type = pb.HolePunch_CONNECT.Enum() - msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + msg.ObsAddrs = addrsToBytes(ownAddrs) tstart := time.Now() if err := wr.WriteMsg(msg); err != nil { return 0, nil, fmt.Errorf("failed to write CONNECT message to initator: %w", err) @@ -327,11 +387,6 @@ func (hs *Service) handleNewStream(s network.Stream) { err = hs.holePunchConnect(pi, false) dt := time.Since(start) hs.tracer.EndHolePunch(rp, dt, err) - if err != nil { - log.Debugw("hole punching failed", "peer", rp, "time", dt, "error", err) - } else { - log.Debugw("hole punching succeeded", "peer", rp, "time", dt) - } } func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error { @@ -349,6 +404,26 @@ func (hs *Service) holePunchConnect(pi peer.AddrInfo, isClient bool) error { return nil } +func containsPublicAddr(addrs []ma.Multiaddr) bool { + for _, addr := range addrs { + if isRelayAddress(addr) || !manet.IsPublicAddr(addr) { + continue + } + return true + } + return false +} + +func removeRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + result := make([]ma.Multiaddr, 0, len(addrs)) + for _, addr := range addrs { + if !isRelayAddress(addr) { + result = append(result, addr) + } + } + return result +} + func isRelayAddress(a ma.Multiaddr) bool { _, err := a.ValueForProtocol(ma.P_CIRCUIT) return err == nil @@ -390,6 +465,7 @@ func (nn *netNotifiee) Connected(_ network.Network, conn network.Conn) { // that we can dial to for a hole punch. case <-hs.ids.IdentifyWait(conn): case <-hs.ctx.Done(): + return } _ = hs.DirectConnect(conn.RemotePeer()) diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go index 4786e3416d..da0db1ae19 100644 --- a/p2p/protocol/holepunch/coordination_test.go +++ b/p2p/protocol/holepunch/coordination_test.go @@ -47,6 +47,22 @@ func (m *mockEventTracer) getEvents() []*holepunch.Event { var _ holepunch.EventTracer = &mockEventTracer{} +type mockIDService struct { + identify.IDService +} + +var _ identify.IDService = &mockIDService{} + +func newMockIDService(t *testing.T, h host.Host) identify.IDService { + ids, err := identify.NewIDService(h) + require.NoError(t, err) + return &mockIDService{IDService: ids} +} + +func (s *mockIDService) OwnObservedAddrs() []ma.Multiaddr { + return append(s.IDService.OwnObservedAddrs(), ma.StringCast("/ip4/1.1.1.1/tcp/1234")) +} + func TestNoHolePunchIfDirectConnExists(t *testing.T) { tr := &mockEventTracer{} h1, hps := mkHostWithHolePunchSvc(t, holepunch.WithTracer(tr)) @@ -95,7 +111,7 @@ func TestDirectDialWorks(t *testing.T) { func TestEndToEndSimConnect(t *testing.T) { tr := &mockEventTracer{} - h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), true) + h1, h2, relay, _ := makeRelayedHosts(t, nil, holepunch.WithTracer(tr), true) defer h1.Close() defer h2.Close() defer relay.Close() @@ -158,11 +174,11 @@ func TestFailuresOnInitiator(t *testing.T) { } tr := &mockEventTracer{} - h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), false) + h1, h2, relay, _ := makeRelayedHosts(t, nil, nil, false) defer h1.Close() defer h2.Close() defer relay.Close() - hps := addHolePunchService(t, h2) + hps := addHolePunchService(t, h2, holepunch.WithTracer(tr)) if tc.rhandler != nil { h1.SetStreamHandler(holepunch.Protocol, tc.rhandler) @@ -180,6 +196,14 @@ func TestFailuresOnInitiator(t *testing.T) { } } +func addrsToBytes(as []ma.Multiaddr) [][]byte { + bzs := make([][]byte, 0, len(as)) + for _, a := range as { + bzs = append(bzs, a.Bytes()) + } + return bzs +} + func TestFailuresOnResponder(t *testing.T) { tcs := map[string]struct { initiator func(s network.Stream) @@ -192,10 +216,13 @@ func TestFailuresOnResponder(t *testing.T) { }, errMsg: "expected CONNECT message", }, - "initiator does NOT send a SYNC message after a Connect message": { + "initiator does NOT send a SYNC message after a CONNECT message": { initiator: func(s network.Stream) { w := protoio.NewDelimitedWriter(s) - w.WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()}) + w.WriteMsg(&holepunch_pb.HolePunch{ + Type: holepunch_pb.HolePunch_CONNECT.Enum(), + ObsAddrs: addrsToBytes([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/1234")}), + }) w.WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()}) }, errMsg: "expected SYNC message", @@ -203,11 +230,22 @@ func TestFailuresOnResponder(t *testing.T) { "initiator does NOT reply within hole punch deadline": { holePunchTimeout: 10 * time.Millisecond, initiator: func(s network.Stream) { - protoio.NewDelimitedWriter(s).WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()}) + protoio.NewDelimitedWriter(s).WriteMsg(&holepunch_pb.HolePunch{ + Type: holepunch_pb.HolePunch_CONNECT.Enum(), + ObsAddrs: addrsToBytes([]ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/1234")}), + }) time.Sleep(10 * time.Second) }, errMsg: "i/o deadline reached", }, + "initiator does NOT send any addresses in CONNECT": { + holePunchTimeout: 10 * time.Millisecond, + initiator: func(s network.Stream) { + protoio.NewDelimitedWriter(s).WriteMsg(&holepunch_pb.HolePunch{Type: holepunch_pb.HolePunch_CONNECT.Enum()}) + time.Sleep(10 * time.Second) + }, + errMsg: "expected CONNECT message to contain at least one address", + }, } for name, tc := range tcs { @@ -219,7 +257,7 @@ func TestFailuresOnResponder(t *testing.T) { } tr := &mockEventTracer{} - h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), false) + h1, h2, relay, _ := makeRelayedHosts(t, holepunch.WithTracer(tr), nil, false) defer h1.Close() defer h2.Close() defer relay.Close() @@ -293,7 +331,7 @@ func ensureDirectConn(t *testing.T, h1, h2 host.Host) { }, 5*time.Second, 50*time.Millisecond) } -func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Host) host.Host { +func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host { if race.WithRace() { t.Skip("modifying manet.Private4 is racy") } @@ -327,9 +365,13 @@ func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Hos return h } -func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) { +func makeRelayedHosts(t *testing.T, h1opt, h2opt holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) { t.Helper() - h1, _ = mkHostWithHolePunchSvc(t, h1Opt) + var h1opts []holepunch.Option + if h1opt != nil { + h1opts = append(h1opts, h1opt) + } + h1, _ = mkHostWithHolePunchSvc(t, h1opts...) var err error relay, err = libp2p.New(libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")), libp2p.DisableRelay()) require.NoError(t, err) @@ -337,9 +379,9 @@ func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option, addHolePuncher bool) _, err = relayv1.NewRelay(relay) require.NoError(t, err) - h2 = mkHostWithStaticAutoRelay(t, context.Background(), relay) + h2 = mkHostWithStaticAutoRelay(t, relay) if addHolePuncher { - hps = addHolePunchService(t, h2) + hps = addHolePunchService(t, h2, h2opt) } // h1 has a relay addr @@ -359,11 +401,13 @@ func makeRelayedHosts(t *testing.T, h1Opt holepunch.Option, addHolePuncher bool) return } -func addHolePunchService(t *testing.T, h host.Host) *holepunch.Service { +func addHolePunchService(t *testing.T, h host.Host, opt holepunch.Option) *holepunch.Service { t.Helper() - ids, err := identify.NewIDService(h) - require.NoError(t, err) - hps, err := holepunch.NewService(h, ids) + var opts []holepunch.Option + if opt != nil { + opts = append(opts, opt) + } + hps, err := holepunch.NewService(h, newMockIDService(t, h), opts...) require.NoError(t, err) return hps } @@ -372,9 +416,7 @@ func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host, t.Helper() h, err := libp2p.New(libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0"), ma.StringCast("/ip6/::1/tcp/0"))) require.NoError(t, err) - ids, err := identify.NewIDService(h) - require.NoError(t, err) - hps, err := holepunch.NewService(h, ids, opts...) + hps, err := holepunch.NewService(h, newMockIDService(t, h), opts...) require.NoError(t, err) return h, hps } diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 5bee2176f9..0ceb17ce56 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -73,15 +73,33 @@ type rmPeerHandlerReq struct { p peer.ID } -// IDService is a structure that implements ProtocolIdentify. +type IDService interface { + // IdentifyConn synchronously triggers an identify request on the connection and + // waits for it to complete. If the connection is being identified by another + // caller, this call will wait. If the connection has already been identified, + // it will return immediately. + IdentifyConn(network.Conn) + // IdentifyWait triggers an identify (if the connection has not already been + // identified) and returns a channel that is closed when the identify protocol + // completes. + IdentifyWait(network.Conn) <-chan struct{} + // OwnObservedAddrs returns the addresses peers have reported we've dialed from + OwnObservedAddrs() []ma.Multiaddr + // ObservedAddrsFor returns the addresses peers have reported we've dialed from, + // for a specific local address. + ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr + io.Closer +} + +// idService is a structure that implements ProtocolIdentify. // It is a trivial service that gives the other peer some // useful information about the local peer. A sort of hello. // -// The IDService sends: +// The idService sends: // * Our IPFS Protocol Version // * Our IPFS Agent Version // * Our public Listen Addresses -type IDService struct { +type idService struct { Host host.Host UserAgent string @@ -111,9 +129,9 @@ type IDService struct { rmPeerHandlerCh chan rmPeerHandlerReq } -// NewIDService constructs a new *IDService and activates it by +// NewIDService constructs a new *idService and activates it by // attaching its stream handler to the given host.Host. -func NewIDService(h host.Host, opts ...Option) (*IDService, error) { +func NewIDService(h host.Host, opts ...Option) (*idService, error) { var cfg config for _, opt := range opts { opt(&cfg) @@ -124,7 +142,7 @@ func NewIDService(h host.Host, opts ...Option) (*IDService, error) { userAgent = cfg.userAgent } - s := &IDService{ + s := &idService{ Host: h, UserAgent: userAgent, @@ -171,7 +189,7 @@ func NewIDService(h host.Host, opts ...Option) (*IDService, error) { return s, nil } -func (ids *IDService) loop() { +func (ids *idService) loop() { defer ids.refCount.Done() phs := make(map[peer.ID]*peerHandler) @@ -270,35 +288,27 @@ func (ids *IDService) loop() { } } -// Close shuts down the IDService -func (ids *IDService) Close() error { +// Close shuts down the idService +func (ids *idService) Close() error { ids.ctxCancel() ids.observedAddrs.Close() ids.refCount.Wait() return nil } -// OwnObservedAddrs returns the addresses peers have reported we've dialed from -func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr { +func (ids *idService) OwnObservedAddrs() []ma.Multiaddr { return ids.observedAddrs.Addrs() } -func (ids *IDService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr { +func (ids *idService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr { return ids.observedAddrs.AddrsFor(local) } -// IdentifyConn synchronously triggers an identify request on the connection and -// waits for it to complete. If the connection is being identified by another -// caller, this call will wait. If the connection has already been identified, -// it will return immediately. -func (ids *IDService) IdentifyConn(c network.Conn) { +func (ids *idService) IdentifyConn(c network.Conn) { <-ids.IdentifyWait(c) } -// IdentifyWait triggers an identify (if the connection has not already been -// identified) and returns a channel that is closed when the identify protocol -// completes. -func (ids *IDService) IdentifyWait(c network.Conn) <-chan struct{} { +func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} { ids.connsMu.RLock() wait, found := ids.conns[c] ids.connsMu.RUnlock() @@ -325,13 +335,13 @@ func (ids *IDService) IdentifyWait(c network.Conn) <-chan struct{} { return wait } -func (ids *IDService) removeConn(c network.Conn) { +func (ids *idService) removeConn(c network.Conn) { ids.connsMu.Lock() delete(ids.conns, c) ids.connsMu.Unlock() } -func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { +func (ids *idService) identifyConn(c network.Conn, signal chan struct{}) { var ( s network.Stream err error @@ -375,7 +385,7 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { err = ids.handleIdentifyResponse(s) } -func (ids *IDService) sendIdentifyResp(s network.Stream) { +func (ids *idService) sendIdentifyResp(s network.Stream) { defer s.Close() c := s.Conn() @@ -407,7 +417,7 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) { log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr()) } -func (ids *IDService) handleIdentifyResponse(s network.Stream) error { +func (ids *idService) handleIdentifyResponse(s network.Stream) error { _ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) c := s.Conn() @@ -446,7 +456,7 @@ func readAllIDMessages(r protoio.Reader, finalMsg proto.Message) error { return fmt.Errorf("too many parts") } -func (ids *IDService) getSnapshot() *identifySnapshot { +func (ids *idService) getSnapshot() *identifySnapshot { snapshot := new(identifySnapshot) if !ids.disableSignedPeerRecord { if cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore()); ok { @@ -458,7 +468,7 @@ func (ids *IDService) getSnapshot() *identifySnapshot { return snapshot } -func (ids *IDService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error { +func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error { mes := ids.createBaseIdentifyResponse(c, snapshot) sr := ids.getSignedRecord(snapshot) mes.SignedPeerRecord = sr @@ -479,7 +489,7 @@ func (ids *IDService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identify } -func (ids *IDService) createBaseIdentifyResponse( +func (ids *idService) createBaseIdentifyResponse( conn network.Conn, snapshot *identifySnapshot, ) *pb.Identify { @@ -536,7 +546,7 @@ func (ids *IDService) createBaseIdentifyResponse( return mes } -func (ids *IDService) getSignedRecord(snapshot *identifySnapshot) []byte { +func (ids *idService) getSignedRecord(snapshot *identifySnapshot) []byte { if ids.disableSignedPeerRecord || snapshot.record == nil { return nil } @@ -550,7 +560,7 @@ func (ids *IDService) getSignedRecord(snapshot *identifySnapshot) []byte { return recBytes } -func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) { +func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) { p := c.RemotePeer() // mes.Protocols @@ -632,7 +642,7 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) { ids.consumeReceivedPubKey(c, mes.PublicKey) } -func (ids *IDService) consumeReceivedPubKey(c network.Conn, kb []byte) { +func (ids *idService) consumeReceivedPubKey(c network.Conn, kb []byte) { lp := c.LocalPeer() rp := c.RemotePeer() @@ -735,7 +745,7 @@ func HasConsistentTransport(a ma.Multiaddr, green []ma.Multiaddr) bool { return false } -func (ids *IDService) consumeObservedAddress(observed []byte, c network.Conn) { +func (ids *idService) consumeObservedAddress(observed []byte, c network.Conn) { if observed == nil { return } @@ -767,10 +777,10 @@ func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) { } // netNotifiee defines methods to be used with the IpfsDHT -type netNotifiee IDService +type netNotifiee idService -func (nn *netNotifiee) IDService() *IDService { - return (*IDService)(nn) +func (nn *netNotifiee) IDService() *idService { + return (*idService)(nn) } func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { diff --git a/p2p/protocol/identify/id_delta.go b/p2p/protocol/identify/id_delta.go index a7492bee01..738fcc386b 100644 --- a/p2p/protocol/identify/id_delta.go +++ b/p2p/protocol/identify/id_delta.go @@ -1,11 +1,12 @@ package identify import ( + "time" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - "time" pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" @@ -15,7 +16,7 @@ import ( const IDDelta = "/p2p/id/delta/1.0.0" // deltaHandler handles incoming delta updates from peers. -func (ids *IDService) deltaHandler(s network.Stream) { +func (ids *idService) deltaHandler(s network.Stream) { _ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) c := s.Conn() @@ -46,7 +47,7 @@ func (ids *IDService) deltaHandler(s network.Stream) { // consumeDelta processes an incoming delta from a peer, updating the peerstore // and emitting the appropriate events. -func (ids *IDService) consumeDelta(id peer.ID, delta *pb.Delta) error { +func (ids *idService) consumeDelta(id peer.ID, delta *pb.Delta) error { err := ids.Host.Peerstore().AddProtocols(id, delta.GetAddedProtocols()...) if err != nil { return err diff --git a/p2p/protocol/identify/id_push.go b/p2p/protocol/identify/id_push.go index c2977e4bfe..1f644d1ee9 100644 --- a/p2p/protocol/identify/id_push.go +++ b/p2p/protocol/identify/id_push.go @@ -12,6 +12,6 @@ import ( const IDPush = "/ipfs/id/push/1.0.0" // pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol. -func (ids *IDService) pushHandler(s network.Stream) { +func (ids *idService) pushHandler(s network.Stream) { ids.handleIdentifyResponse(s) } diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index 8008687881..415dc92cb6 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -81,7 +81,7 @@ func subtestIDService(t *testing.T) { ids1.IdentifyConn(h1t2c[0]) - // the IDService should be opened automatically, by the network. + // the idService should be opened automatically, by the network. // what we should see now is that both peers know about each others listen addresses. t.Log("test peer1 has peer2 addrs correctly") testKnowsAddrs(t, h1, h2p, h2.Addrs()) // has them @@ -857,7 +857,7 @@ func TestLargeIdentifyMessage(t *testing.T) { ids1.IdentifyConn(h1t2c[0]) - // the IDService should be opened automatically, by the network. + // the idService should be opened automatically, by the network. // what we should see now is that both peers know about each others listen addresses. t.Log("test peer1 has peer2 addrs correctly") testKnowsAddrs(t, h1, h2p, h2.Addrs()) // has them diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index 4c3b675840..ad62910f4a 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -27,7 +27,7 @@ type identifySnapshot struct { } type peerHandler struct { - ids *IDService + ids *idService cancel context.CancelFunc @@ -40,7 +40,7 @@ type peerHandler struct { deltaCh chan struct{} } -func newPeerHandler(pid peer.ID, ids *IDService) *peerHandler { +func newPeerHandler(pid peer.ID, ids *idService) *peerHandler { ph := &peerHandler{ ids: ids, pid: pid,