diff --git a/p2p/discovery.go b/p2p/discovery.go
index d03e649a3..8b290ff1f 100644
--- a/p2p/discovery.go
+++ b/p2p/discovery.go
@@ -291,7 +291,7 @@ func NewDiscoveryRouter(tcpNode host.Host, udpNode *MutableUDPNode, peers []Peer
// it returns false if the peer isn't discovered.
func getDiscoveredAddress(udpNode *MutableUDPNode, p Peer) (ma.Multiaddr, bool, error) {
resolved := udpNode.Resolve(&p.Enode)
- if resolved.Seq() == 0 || resolved.TCP() == 0 {
+ if resolved.Seq() == p.Enode.Seq() || resolved.TCP() == 0 {
return nil, false, nil // Not discovered
}
diff --git a/p2p/errors.go b/p2p/errors.go
new file mode 100644
index 000000000..d8b3dd579
--- /dev/null
+++ b/p2p/errors.go
@@ -0,0 +1,86 @@
+// Copyright © 2022 Obol Labs Inc.
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along with
+// this program. If not, see .
+
+package p2p
+
+import (
+ "strings"
+
+ "github.com/libp2p/go-libp2p/core/peer"
+ "github.com/libp2p/go-libp2p/p2p/net/swarm"
+ ma "github.com/multiformats/go-multiaddr"
+
+ "github.com/obolnetwork/charon/app/errors"
+)
+
+// hasErrDialBackoff returns true if the error contains swarm.ErrDialBackoff.
+func hasErrDialBackoff(err error) bool {
+ dErr := new(swarm.DialError)
+ if !errors.As(err, &dErr) {
+ return false
+ }
+
+ for _, trErr := range dErr.DialErrors {
+ if errors.Is(trErr.Cause, swarm.ErrDialBackoff) {
+ return true
+ }
+ }
+
+ return false
+}
+
+// dialErrMsgs returns a map of dial error messages by named address or false if the error is not a swarm.DialError.
+func dialErrMsgs(err error) (map[string]string, bool) {
+ dErr := new(swarm.DialError)
+ if !errors.As(err, &dErr) {
+ return nil, false
+ }
+
+ // We do not expect cause to be populated.
+ if dErr.Cause != nil {
+ return nil, false
+ }
+
+ resp := make(map[string]string)
+ for _, trErr := range dErr.DialErrors {
+ resp[NamedAddr(trErr.Address)] = trErr.Cause.Error()
+ }
+
+ return resp, true
+}
+
+// NamedAddr returns the multiaddr as a string with peer names instead of peer IDs.
+func NamedAddr(addr ma.Multiaddr) string {
+ var resp []string
+
+ ma.ForEach(addr, func(c ma.Component) bool {
+ if c.Protocol().Code == ma.P_P2P {
+ if id, err := peer.Decode(c.Value()); err == nil {
+ resp = append(resp, c.Protocol().Name, PeerName(id))
+ return true
+ }
+ }
+ if c.Protocol().Name != "" {
+ resp = append(resp, c.Protocol().Name)
+ }
+ if c.Value() != "" {
+ resp = append(resp, strings.TrimPrefix(c.Value(), "/"))
+ }
+
+ return true
+ })
+
+ return "/" + strings.Join(resp, "/")
+}
diff --git a/p2p/errors_internal_test.go b/p2p/errors_internal_test.go
new file mode 100644
index 000000000..863dfd11c
--- /dev/null
+++ b/p2p/errors_internal_test.go
@@ -0,0 +1,104 @@
+// Copyright © 2022 Obol Labs Inc.
+//
+// This program is free software: you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation, either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but WITHOUT
+// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+// more details.
+//
+// You should have received a copy of the GNU General Public License along with
+// this program. If not, see .
+
+package p2p
+
+import (
+ "context"
+ "testing"
+
+ "github.com/libp2p/go-libp2p"
+ "github.com/libp2p/go-libp2p/core/peerstore"
+ ma "github.com/multiformats/go-multiaddr"
+ "github.com/stretchr/testify/require"
+
+ "github.com/obolnetwork/charon/testutil"
+)
+
+//go:generate go test . -clean -update
+
+func TestNamedAddr(t *testing.T) {
+ // Copied from github.com/multiformats/go-multiaddr/multiaddr_test.go
+ addrs := []string{
+ "/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234",
+ "/p2p/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7/tcp/1234",
+ "/ip4/127.0.0.1/udp/1234",
+ "/ip4/127.0.0.1/udp/0",
+ "/ip4/127.0.0.1/tcp/1234",
+ "/ip4/127.0.0.1/tcp/1234/",
+ "/ip4/127.0.0.1/udp/1234/quic",
+ "/ip4/127.0.0.1/udp/1234/quic/webtransport",
+ "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy",
+ "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy/certhash/zQmbWTwYGcmdyK9CYfNBcfs9nhZs17a6FQ4Y8oea278xx41",
+ "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
+ "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234",
+ "/ip4/127.0.0.1/ipfs/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7",
+ "/ip4/127.0.0.1/ipfs/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7/tcp/1234",
+ "/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC",
+ "/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234",
+ "/ip4/127.0.0.1/p2p/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7",
+ "/ip4/127.0.0.1/p2p/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7/tcp/1234",
+ "/unix/a/b/c/d/e",
+ "/unix/stdio",
+ "/ip4/1.2.3.4/tcp/80/unix/a/b/c/d/e/f",
+ "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234/unix/stdio",
+ }
+
+ var resp []string
+ for _, addr := range addrs {
+ a, err := ma.NewMultiaddr(addr)
+ require.NoError(t, err)
+
+ resp = append(resp, NamedAddr(a))
+ }
+
+ testutil.RequireGoldenJSON(t, resp)
+}
+
+func TestDialErrMsgs(t *testing.T) {
+ ctx := context.Background()
+
+ closed, err := NewConnGater(nil, nil)
+ require.NoError(t, err)
+ badAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/")
+ require.NoError(t, err)
+
+ hostA := testutil.CreateHost(t, testutil.AvailableAddr(t))
+ hostB := testutil.CreateHost(t, testutil.AvailableAddr(t), libp2p.ConnectionGater(closed))
+ hostBAddr := hostB.Addrs()[0]
+
+ hostA.Peerstore().AddAddr(hostB.ID(), hostBAddr, peerstore.TempAddrTTL) // Gater will block these
+ hostA.Peerstore().AddAddr(hostB.ID(), badAddr, peerstore.TempAddrTTL) // Connection refused
+
+ _, err = hostA.Network().DialPeer(ctx, hostB.ID()) // Try dial
+ require.Error(t, err)
+
+ msgs, ok := dialErrMsgs(err)
+ require.True(t, ok)
+ require.False(t, hasErrDialBackoff(err))
+ require.Len(t, msgs, 2)
+ require.Contains(t, msgs[badAddr.String()], "connection refused")
+ require.Contains(t, msgs[hostBAddr.String()], "failed to negotiate stream multiplexer")
+
+ _, err = hostA.Network().DialPeer(ctx, hostB.ID()) // Try dial again
+ require.Error(t, err)
+
+ msgs, ok = dialErrMsgs(err)
+ require.True(t, ok)
+ require.True(t, hasErrDialBackoff(err))
+ require.Len(t, msgs, 2)
+ require.Contains(t, msgs[badAddr.String()], "dial backoff")
+ require.Contains(t, msgs[hostBAddr.String()], "dial backoff")
+}
diff --git a/p2p/name.go b/p2p/name.go
index f47973453..bc404928a 100644
--- a/p2p/name.go
+++ b/p2p/name.go
@@ -388,7 +388,7 @@ func randomName(pk ecdsa.PublicKey) string { //nolint:deadcode
return fmt.Sprintf("%s-%s", adjectives[adjIdx], nouns[nounIdx])
}
-// PeerName calculates the polynomial rolling hash of the peerID string.
+// PeerName returns a deterministic pseudo random human friendly name for the peer ID.
func PeerName(id peer.ID) string {
// p is chosen to be 59 because it's prime and roughly equal to the no of different characters
// you can have in base58 encoded strings. Base58 encoded strings can consist of 58 different
diff --git a/p2p/ping.go b/p2p/ping.go
index 106f26b00..2aa18356a 100644
--- a/p2p/ping.go
+++ b/p2p/ping.go
@@ -17,7 +17,7 @@ package p2p
import (
"context"
- "sync"
+ "fmt"
"time"
"github.com/libp2p/go-libp2p/core/host"
@@ -27,16 +27,15 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/obolnetwork/charon/app/errors"
+ "github.com/obolnetwork/charon/app/expbackoff"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)
// NewPingService returns a start function of a p2p ping service that pings all peers every second
// and collects metrics.
-// TODO(corver): Cluster wide req/resp doesn't scale since it is O(n^2).
func NewPingService(h host.Host, peers []peer.ID, callback func(peer.ID)) func(context.Context) {
svc := ping.NewPingService(h)
- logFunc := newPingLogger(peers)
return func(ctx context.Context) {
ctx = log.WithTopic(ctx, "ping")
@@ -47,27 +46,26 @@ func NewPingService(h host.Host, peers []peer.ID, callback func(peer.ID)) func(c
continue
}
- go pingPeer(ctx, svc, p, logFunc, callback)
+ go pingPeer(ctx, svc, p, callback)
}
}
}
// pingPeer starts (and restarts) a long-lived ping service stream, pinging the peer every second until some error.
// It returns when the context is cancelled.
-func pingPeer(ctx context.Context, svc *ping.PingService, p peer.ID,
- logFunc func(context.Context, peer.ID, ping.Result), callback func(peer.ID),
+func pingPeer(ctx context.Context, svc *ping.PingService, p peer.ID, callback func(peer.ID),
) {
+ backoff := expbackoff.New(ctx, expbackoff.WithMaxDelay(time.Second*30)) // Start quick, then slow down
+ logFunc := newPingLogger(svc.Host, p)
for ctx.Err() == nil {
pingPeerOnce(ctx, svc, p, logFunc, callback)
-
- const backoff = time.Second
- time.Sleep(backoff)
+ backoff()
}
}
// pingPeerOnce starts a long lived ping connection with the peer and returns on first error.
func pingPeerOnce(ctx context.Context, svc *ping.PingService, p peer.ID,
- logFunc func(context.Context, peer.ID, ping.Result), callback func(peer.ID),
+ logFunc func(context.Context, ping.Result), callback func(peer.ID),
) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -78,7 +76,7 @@ func pingPeerOnce(ctx context.Context, svc *ping.PingService, p peer.ID,
return
}
- logFunc(ctx, p, result)
+ logFunc(ctx, result)
if result.Error != nil {
incPingError(p)
@@ -88,6 +86,7 @@ func pingPeerOnce(ctx context.Context, svc *ping.PingService, p peer.ID,
}
observePing(p, result.RTT)
+
if callback != nil {
callback(p)
}
@@ -100,54 +99,98 @@ func isRelayError(err error) bool {
errors.Is(err, network.ErrResourceScopeClosed)
}
-// newPingLogger returns stateful logging function that logs ping failures
-// and recoveries after applying hysteresis; only logging after N opposite results.
-func newPingLogger(peers []peer.ID) func(context.Context, peer.ID, ping.Result) {
- const hysteresis = 5 // N = 5
-
+// newPingLogger returns stateful logging function that logs "real" dial errors when they change or every 10min.
+// This is the main logger of "why are we not connected to peer X".
+func newPingLogger(tcpNode host.Host, p peer.ID) func(context.Context, ping.Result) {
var (
- mu sync.Mutex
- first = make(map[peer.ID]bool) // first indicates if the peer has logged anything.
- state = make(map[peer.ID]bool) // state indicates if the peer is ok or not
- counts = make(map[peer.ID]int) // counts indicates number of successful pings; 0 <= x <= hysteresis
- errs = make(map[peer.ID]error) // errs contains last non-dial backoff error
+ prevMsgs = make(map[string]string)
+ prevResolvedMsgs = make(map[string]string)
+ prevSuccess bool
+ clearedAt = time.Now()
+ clearPeriod = time.Minute * 10 // Log same msgs every 10min
)
- for _, p := range peers {
- state[p] = true
- counts[p] = hysteresis
- errs[p] = swarm.ErrDialBackoff // This will be over-written with no-dial-backoff errors if any.
+ sameMsgs := func(msgs map[string]string) bool {
+ return fmt.Sprint(msgs) == fmt.Sprint(prevMsgs) || fmt.Sprint(msgs) == fmt.Sprint(prevResolvedMsgs)
}
- return func(ctx context.Context, p peer.ID, result ping.Result) {
- mu.Lock()
- defer mu.Unlock()
+ return func(ctx context.Context, result ping.Result) {
+ if result.Error == nil && prevSuccess {
+ // All still good
+ return
+ } else if result.Error == nil && !prevSuccess {
+ // Reconnected
+ log.Info(ctx, "Peer connected", z.Str("peer", PeerName(p)), z.Any("rtt", result.RTT))
+ prevSuccess = true
- prev := counts[p]
+ return
+ }
- if result.Error != nil && prev > 0 {
- counts[p]-- // Decrease success count since ping failed.
- } else if result.Error == nil && prev < hysteresis {
- counts[p]++ // Increase success count since ping succeeded.
+ if time.Since(clearedAt) > clearPeriod {
+ prevMsgs = make(map[string]string)
+ prevResolvedMsgs = make(map[string]string)
+ clearedAt = time.Now()
}
- if result.Error != nil && !errors.Is(result.Error, swarm.ErrDialBackoff) {
- errs[p] = result.Error // Dial backoff errors are not informative, cache others.
+ msgs, ok := dialErrMsgs(result.Error)
+ if !ok { // Unexpected non-dial reason...
+ if prevSuccess {
+ log.Warn(ctx, "Peer ping failing", nil, z.Str("peer", PeerName(p)), z.Str("error", result.Error.Error()))
+ }
+ prevSuccess = false
+
+ return
}
- now := counts[p]
- ok := state[p]
-
- if prev > 0 && now == 0 && ok {
- log.Warn(ctx, "Peer ping failing", nil, z.Str("peer", PeerName(p)), z.Str("error", errs[p].Error()))
- state[p] = false
- first[p] = true
- } else if prev < hysteresis && now == hysteresis && !ok {
- log.Info(ctx, "Peer ping recovered", z.Str("peer", PeerName(p)), z.Any("rtt", result.RTT))
- state[p] = true
- } else if result.Error == nil && !first[p] {
- log.Info(ctx, "Peer ping success", z.Str("peer", PeerName(p)), z.Any("rtt", result.RTT))
- first[p] = true
+ if !prevSuccess && sameMsgs(msgs) {
+ // Still failing for the same reasons, don't log
+ return
}
+
+ prevMsgs = msgs
+ prevSuccess = false
+
+ // Log when failing after success or failing for different reasons
+
+ if hasErrDialBackoff(result.Error) {
+ msgs = resolveBackoffMsgs(ctx, tcpNode, p) // Best effort resolving of dial backoff errors.
+ if len(msgs) == 0 || sameMsgs(msgs) {
+ // No more errors, or same messages, ok well...
+ return
+ }
+ prevResolvedMsgs = msgs
+ }
+
+ // TODO(corver): Reconsider this logging format
+ opts := []z.Field{z.Str("peer", PeerName(p))}
+ for addr, msg := range msgs {
+ opts = append(opts, z.Str(addr, msg))
+ }
+
+ log.Warn(ctx, "Peer not connected", nil, opts...)
+ }
+}
+
+func resolveBackoffMsgs(ctx context.Context, tcpNode host.Host, p peer.ID) map[string]string {
+ net, ok := tcpNode.Network().(*swarm.Swarm)
+ if !ok {
+ log.Error(ctx, "Not a swarm network", nil)
+ return nil
}
+
+ net.Backoff().Clear(p)
+
+ _, err := net.DialPeer(ctx, p)
+ if err == nil {
+ // Connected now....
+ return nil
+ }
+
+ msgs, ok := dialErrMsgs(err)
+ if !ok { // Some other error
+ log.Warn(ctx, "Peer dial failing", nil, z.Str("peer", PeerName(p)), z.Str("error", err.Error()))
+ return nil
+ }
+
+ return msgs
}
diff --git a/p2p/sender.go b/p2p/sender.go
index e8a3df8e7..cdf7ff43c 100644
--- a/p2p/sender.go
+++ b/p2p/sender.go
@@ -88,7 +88,11 @@ func (s *Sender) addResult(ctx context.Context, peerID peer.ID, err error) {
}
} else if failure && (len(state.buffer) == 1 || !state.failing) {
// First attempt failed or state changed to failing
- log.Warn(ctx, "P2P sending failing", err, z.Str("peer", PeerName(peerID)))
+
+ if _, ok := dialErrMsgs(err); !ok { // Only log non-dial errors
+ log.Warn(ctx, "P2P sending failing", err, z.Str("peer", PeerName(peerID)))
+ }
+
state.failing = true
}
diff --git a/p2p/testdata/TestNamedAddr.golden b/p2p/testdata/TestNamedAddr.golden
new file mode 100644
index 000000000..ff3fa1457
--- /dev/null
+++ b/p2p/testdata/TestNamedAddr.golden
@@ -0,0 +1,24 @@
+[
+ "/p2p/anxious-dates/tcp/1234",
+ "/p2p/anxious-dates/tcp/1234",
+ "/ip4/127.0.0.1/udp/1234",
+ "/ip4/127.0.0.1/udp/0",
+ "/ip4/127.0.0.1/tcp/1234",
+ "/ip4/127.0.0.1/tcp/1234",
+ "/ip4/127.0.0.1/udp/1234/quic",
+ "/ip4/127.0.0.1/udp/1234/quic/webtransport",
+ "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw",
+ "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
+ "/ip4/127.0.0.1/p2p/anxious-dates",
+ "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234",
+ "/ip4/127.0.0.1/p2p/anxious-dates",
+ "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234",
+ "/ip4/127.0.0.1/p2p/anxious-dates",
+ "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234",
+ "/ip4/127.0.0.1/p2p/anxious-dates",
+ "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234",
+ "/unix/a/b/c/d/e",
+ "/unix/stdio",
+ "/ip4/1.2.3.4/tcp/80/unix/a/b/c/d/e/f",
+ "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234/unix/stdio"
+]
\ No newline at end of file
diff --git a/testutil/random.go b/testutil/random.go
index 93afc50c5..4d609f3bd 100644
--- a/testutil/random.go
+++ b/testutil/random.go
@@ -535,7 +535,7 @@ func AvailableMultiAddr(t *testing.T) multiaddr.Multiaddr {
return addr
}
-func CreateHost(t *testing.T, addr *net.TCPAddr) host.Host {
+func CreateHost(t *testing.T, addr *net.TCPAddr, opts ...libp2p.Option) host.Host {
t.Helper()
pkey, _, err := p2pcrypto.GenerateSecp256k1Key(crand.Reader)
require.NoError(t, err)
@@ -543,7 +543,11 @@ func CreateHost(t *testing.T, addr *net.TCPAddr) host.Host {
addrs, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", addr.IP, addr.Port))
require.NoError(t, err)
- h, err := libp2p.New(libp2p.Identity(pkey), libp2p.ListenAddrs(addrs))
+ opts2 := []libp2p.Option{libp2p.Identity(pkey), libp2p.ListenAddrs(addrs)}
+ opts2 = append(opts2, opts...)
+
+ h, err := libp2p.New(opts2...)
+
require.NoError(t, err)
return h