From fded64a5674d7220ba959fc16e9a23fa20084a4a Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 19 Aug 2022 15:49:19 +0200 Subject: [PATCH 1/3] p2p: refactor connectedness logger --- p2p/discovery.go | 2 +- p2p/errors.go | 86 ++++++++++++++++++ p2p/errors_internal_test.go | 104 ++++++++++++++++++++++ p2p/name.go | 2 +- p2p/ping.go | 139 +++++++++++++++++++----------- p2p/sender.go | 6 +- p2p/testdata/TestNamedAddr.golden | 24 ++++++ testutil/random.go | 8 +- 8 files changed, 318 insertions(+), 53 deletions(-) create mode 100644 p2p/errors.go create mode 100644 p2p/errors_internal_test.go create mode 100644 p2p/testdata/TestNamedAddr.golden diff --git a/p2p/discovery.go b/p2p/discovery.go index d03e649a3..8b290ff1f 100644 --- a/p2p/discovery.go +++ b/p2p/discovery.go @@ -291,7 +291,7 @@ func NewDiscoveryRouter(tcpNode host.Host, udpNode *MutableUDPNode, peers []Peer // it returns false if the peer isn't discovered. func getDiscoveredAddress(udpNode *MutableUDPNode, p Peer) (ma.Multiaddr, bool, error) { resolved := udpNode.Resolve(&p.Enode) - if resolved.Seq() == 0 || resolved.TCP() == 0 { + if resolved.Seq() == p.Enode.Seq() || resolved.TCP() == 0 { return nil, false, nil // Not discovered } diff --git a/p2p/errors.go b/p2p/errors.go new file mode 100644 index 000000000..a4bac25ae --- /dev/null +++ b/p2p/errors.go @@ -0,0 +1,86 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package p2p + +import ( + "strings" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/net/swarm" + ma "github.com/multiformats/go-multiaddr" + + "github.com/obolnetwork/charon/app/errors" +) + +// hasErrDialBackoff returns true if the error is contains swarm.ErrDialBackoff. +func hasErrDialBackoff(err error) bool { + dErr := new(swarm.DialError) + if !errors.As(err, &dErr) { + return false + } + + for _, trErr := range dErr.DialErrors { + if errors.Is(trErr.Cause, swarm.ErrDialBackoff) { + return true + } + } + + return false +} + +// dialErrMsgs returns a map of dial error messages by named address or false if the error is not a swarm.DialError. +func dialErrMsgs(err error) (map[string]string, bool) { + dErr := new(swarm.DialError) + if !errors.As(err, &dErr) { + return nil, false + } + + // We do not expect cause to be populated. + if dErr.Cause != nil { + return nil, false + } + + resp := make(map[string]string) + for _, trErr := range dErr.DialErrors { + resp[NamedAddr(trErr.Address)] = trErr.Cause.Error() + } + + return resp, true +} + +// NamedAddr returns the multiaddr as a string with peer names instead of peer IDs. +func NamedAddr(addr ma.Multiaddr) string { + var resp []string + + ma.ForEach(addr, func(c ma.Component) bool { + if c.Protocol().Code == ma.P_P2P { + if id, err := peer.Decode(c.Value()); err == nil { + resp = append(resp, c.Protocol().Name, PeerName(id)) + return true + } + } + if c.Protocol().Name != "" { + resp = append(resp, c.Protocol().Name) + } + if c.Value() != "" { + resp = append(resp, strings.TrimPrefix(c.Value(), "/")) + } + + return true + }) + + return "/" + strings.Join(resp, "/") +} diff --git a/p2p/errors_internal_test.go b/p2p/errors_internal_test.go new file mode 100644 index 000000000..9bc0ab3eb --- /dev/null +++ b/p2p/errors_internal_test.go @@ -0,0 +1,104 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package p2p + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peerstore" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/testutil" +) + +//go:generate go test . -clean -update + +func TestNamedAddr(t *testing.T) { + // Copied from github.com/multiformats/go-multiaddr/multiaddr_test.go + addrs := []string{ + "/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234", + "/p2p/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7/tcp/1234", + "/ip4/127.0.0.1/udp/1234", + "/ip4/127.0.0.1/udp/0", + "/ip4/127.0.0.1/tcp/1234", + "/ip4/127.0.0.1/tcp/1234/", + "/ip4/127.0.0.1/udp/1234/quic", + "/ip4/127.0.0.1/udp/1234/quic/webtransport", + "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy", + "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy/certhash/zQmbWTwYGcmdyK9CYfNBcfs9nhZs17a6FQ4Y8oea278xx41", + "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC", + "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234", + "/ip4/127.0.0.1/ipfs/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7", + "/ip4/127.0.0.1/ipfs/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7/tcp/1234", + "/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC", + "/ip4/127.0.0.1/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234", + "/ip4/127.0.0.1/p2p/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7", + "/ip4/127.0.0.1/p2p/k2k4r8oqamigqdo6o7hsbfwd45y70oyynp98usk7zmyfrzpqxh1pohl7/tcp/1234", + "/unix/a/b/c/d/e", + "/unix/stdio", + "/ip4/1.2.3.4/tcp/80/unix/a/b/c/d/e/f", + "/ip4/127.0.0.1/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/tcp/1234/unix/stdio", + } + + var resp []string + for _, addr := range addrs { + a, err := ma.NewMultiaddr(addr) + require.NoError(t, err) + + resp = append(resp, NamedAddr(a)) + } + + testutil.RequireGoldenJSON(t, resp) +} + +func TestDialErrMsgs(t *testing.T) { + ctx := context.Background() + + closed, err := NewConnGater(nil, nil) + require.NoError(t, err) + badAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/") + require.NoError(t, err) + + hostA := testutil.CreateHost(t, testutil.AvailableAddr(t)) + hostB := testutil.CreateHost(t, testutil.AvailableAddr(t), libp2p.ConnectionGater(closed)) + hostBAddr := hostB.Addrs()[0] + + hostA.Peerstore().AddAddr(hostB.ID(), hostBAddr, peerstore.TempAddrTTL) // Gater will block these + hostA.Peerstore().AddAddr(hostB.ID(), badAddr, peerstore.TempAddrTTL) // Connection refused + + _, err = hostA.Network().DialPeer(ctx, hostB.ID()) // Try dial + require.Error(t, err) + + msgs, ok := dialErrMsgs(err) + require.True(t, ok) + require.False(t, hasErrDialBackoff(err)) + require.Len(t, msgs, 2) + require.Contains(t, msgs[badAddr.String()], "connection refused") + require.Contains(t, msgs[hostBAddr.String()], "failed to negotiate stream multiplexer") + + _, err = hostA.Network().DialPeer(ctx, hostB.ID()) // Try dial again + require.Error(t, err) + + msgs, ok = dialErrMsgs(err) + require.True(t, ok) + require.True(t, hasErrDialBackoff(err)) + require.Len(t, msgs, 2) + require.Contains(t, msgs[badAddr.String()], "dial backoff") + require.Contains(t, msgs[hostBAddr.String()], "dial backoff") +} diff --git a/p2p/name.go b/p2p/name.go index f47973453..bc404928a 100644 --- a/p2p/name.go +++ b/p2p/name.go @@ -388,7 +388,7 @@ func randomName(pk ecdsa.PublicKey) string { //nolint:deadcode return fmt.Sprintf("%s-%s", adjectives[adjIdx], nouns[nounIdx]) } -// PeerName calculates the polynomial rolling hash of the peerID string. +// PeerName returns a deterministic pseudo random human friendly name for the peer ID. func PeerName(id peer.ID) string { // p is chosen to be 59 because it's prime and roughly equal to the no of different characters // you can have in base58 encoded strings. Base58 encoded strings can consist of 58 different diff --git a/p2p/ping.go b/p2p/ping.go index 106f26b00..2aa18356a 100644 --- a/p2p/ping.go +++ b/p2p/ping.go @@ -17,7 +17,7 @@ package p2p import ( "context" - "sync" + "fmt" "time" "github.com/libp2p/go-libp2p/core/host" @@ -27,16 +27,15 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/expbackoff" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" ) // NewPingService returns a start function of a p2p ping service that pings all peers every second // and collects metrics. -// TODO(corver): Cluster wide req/resp doesn't scale since it is O(n^2). func NewPingService(h host.Host, peers []peer.ID, callback func(peer.ID)) func(context.Context) { svc := ping.NewPingService(h) - logFunc := newPingLogger(peers) return func(ctx context.Context) { ctx = log.WithTopic(ctx, "ping") @@ -47,27 +46,26 @@ func NewPingService(h host.Host, peers []peer.ID, callback func(peer.ID)) func(c continue } - go pingPeer(ctx, svc, p, logFunc, callback) + go pingPeer(ctx, svc, p, callback) } } } // pingPeer starts (and restarts) a long-lived ping service stream, pinging the peer every second until some error. // It returns when the context is cancelled. -func pingPeer(ctx context.Context, svc *ping.PingService, p peer.ID, - logFunc func(context.Context, peer.ID, ping.Result), callback func(peer.ID), +func pingPeer(ctx context.Context, svc *ping.PingService, p peer.ID, callback func(peer.ID), ) { + backoff := expbackoff.New(ctx, expbackoff.WithMaxDelay(time.Second*30)) // Start quick, then slow down + logFunc := newPingLogger(svc.Host, p) for ctx.Err() == nil { pingPeerOnce(ctx, svc, p, logFunc, callback) - - const backoff = time.Second - time.Sleep(backoff) + backoff() } } // pingPeerOnce starts a long lived ping connection with the peer and returns on first error. func pingPeerOnce(ctx context.Context, svc *ping.PingService, p peer.ID, - logFunc func(context.Context, peer.ID, ping.Result), callback func(peer.ID), + logFunc func(context.Context, ping.Result), callback func(peer.ID), ) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -78,7 +76,7 @@ func pingPeerOnce(ctx context.Context, svc *ping.PingService, p peer.ID, return } - logFunc(ctx, p, result) + logFunc(ctx, result) if result.Error != nil { incPingError(p) @@ -88,6 +86,7 @@ func pingPeerOnce(ctx context.Context, svc *ping.PingService, p peer.ID, } observePing(p, result.RTT) + if callback != nil { callback(p) } @@ -100,54 +99,98 @@ func isRelayError(err error) bool { errors.Is(err, network.ErrResourceScopeClosed) } -// newPingLogger returns stateful logging function that logs ping failures -// and recoveries after applying hysteresis; only logging after N opposite results. -func newPingLogger(peers []peer.ID) func(context.Context, peer.ID, ping.Result) { - const hysteresis = 5 // N = 5 - +// newPingLogger returns stateful logging function that logs "real" dial errors when they change or every 10min. +// This is the main logger of "why are we not connected to peer X". +func newPingLogger(tcpNode host.Host, p peer.ID) func(context.Context, ping.Result) { var ( - mu sync.Mutex - first = make(map[peer.ID]bool) // first indicates if the peer has logged anything. - state = make(map[peer.ID]bool) // state indicates if the peer is ok or not - counts = make(map[peer.ID]int) // counts indicates number of successful pings; 0 <= x <= hysteresis - errs = make(map[peer.ID]error) // errs contains last non-dial backoff error + prevMsgs = make(map[string]string) + prevResolvedMsgs = make(map[string]string) + prevSuccess bool + clearedAt = time.Now() + clearPeriod = time.Minute * 10 // Log same msgs every 10min ) - for _, p := range peers { - state[p] = true - counts[p] = hysteresis - errs[p] = swarm.ErrDialBackoff // This will be over-written with no-dial-backoff errors if any. + sameMsgs := func(msgs map[string]string) bool { + return fmt.Sprint(msgs) == fmt.Sprint(prevMsgs) || fmt.Sprint(msgs) == fmt.Sprint(prevResolvedMsgs) } - return func(ctx context.Context, p peer.ID, result ping.Result) { - mu.Lock() - defer mu.Unlock() + return func(ctx context.Context, result ping.Result) { + if result.Error == nil && prevSuccess { + // All still good + return + } else if result.Error == nil && !prevSuccess { + // Reconnected + log.Info(ctx, "Peer connected", z.Str("peer", PeerName(p)), z.Any("rtt", result.RTT)) + prevSuccess = true - prev := counts[p] + return + } - if result.Error != nil && prev > 0 { - counts[p]-- // Decrease success count since ping failed. - } else if result.Error == nil && prev < hysteresis { - counts[p]++ // Increase success count since ping succeeded. + if time.Since(clearedAt) > clearPeriod { + prevMsgs = make(map[string]string) + prevResolvedMsgs = make(map[string]string) + clearedAt = time.Now() } - if result.Error != nil && !errors.Is(result.Error, swarm.ErrDialBackoff) { - errs[p] = result.Error // Dial backoff errors are not informative, cache others. + msgs, ok := dialErrMsgs(result.Error) + if !ok { // Unexpected non-dial reason... + if prevSuccess { + log.Warn(ctx, "Peer ping failing", nil, z.Str("peer", PeerName(p)), z.Str("error", result.Error.Error())) + } + prevSuccess = false + + return } - now := counts[p] - ok := state[p] - - if prev > 0 && now == 0 && ok { - log.Warn(ctx, "Peer ping failing", nil, z.Str("peer", PeerName(p)), z.Str("error", errs[p].Error())) - state[p] = false - first[p] = true - } else if prev < hysteresis && now == hysteresis && !ok { - log.Info(ctx, "Peer ping recovered", z.Str("peer", PeerName(p)), z.Any("rtt", result.RTT)) - state[p] = true - } else if result.Error == nil && !first[p] { - log.Info(ctx, "Peer ping success", z.Str("peer", PeerName(p)), z.Any("rtt", result.RTT)) - first[p] = true + if !prevSuccess && sameMsgs(msgs) { + // Still failing for the same reasons, don't log + return } + + prevMsgs = msgs + prevSuccess = false + + // Log when failing after success or failing for different reasons + + if hasErrDialBackoff(result.Error) { + msgs = resolveBackoffMsgs(ctx, tcpNode, p) // Best effort resolving of dial backoff errors. + if len(msgs) == 0 || sameMsgs(msgs) { + // No more errors, or same messages, ok well... + return + } + prevResolvedMsgs = msgs + } + + // TODO(corver): Reconsider this logging format + opts := []z.Field{z.Str("peer", PeerName(p))} + for addr, msg := range msgs { + opts = append(opts, z.Str(addr, msg)) + } + + log.Warn(ctx, "Peer not connected", nil, opts...) + } +} + +func resolveBackoffMsgs(ctx context.Context, tcpNode host.Host, p peer.ID) map[string]string { + net, ok := tcpNode.Network().(*swarm.Swarm) + if !ok { + log.Error(ctx, "Not a swarm network", nil) + return nil } + + net.Backoff().Clear(p) + + _, err := net.DialPeer(ctx, p) + if err == nil { + // Connected now.... + return nil + } + + msgs, ok := dialErrMsgs(err) + if !ok { // Some other error + log.Warn(ctx, "Peer dial failing", nil, z.Str("peer", PeerName(p)), z.Str("error", err.Error())) + return nil + } + + return msgs } diff --git a/p2p/sender.go b/p2p/sender.go index e8a3df8e7..cdf7ff43c 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -88,7 +88,11 @@ func (s *Sender) addResult(ctx context.Context, peerID peer.ID, err error) { } } else if failure && (len(state.buffer) == 1 || !state.failing) { // First attempt failed or state changed to failing - log.Warn(ctx, "P2P sending failing", err, z.Str("peer", PeerName(peerID))) + + if _, ok := dialErrMsgs(err); !ok { // Only log non-dial errors + log.Warn(ctx, "P2P sending failing", err, z.Str("peer", PeerName(peerID))) + } + state.failing = true } diff --git a/p2p/testdata/TestNamedAddr.golden b/p2p/testdata/TestNamedAddr.golden new file mode 100644 index 000000000..ff3fa1457 --- /dev/null +++ b/p2p/testdata/TestNamedAddr.golden @@ -0,0 +1,24 @@ +[ + "/p2p/anxious-dates/tcp/1234", + "/p2p/anxious-dates/tcp/1234", + "/ip4/127.0.0.1/udp/1234", + "/ip4/127.0.0.1/udp/0", + "/ip4/127.0.0.1/tcp/1234", + "/ip4/127.0.0.1/tcp/1234", + "/ip4/127.0.0.1/udp/1234/quic", + "/ip4/127.0.0.1/udp/1234/quic/webtransport", + "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw", + "/ip4/127.0.0.1/udp/1234/quic/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g", + "/ip4/127.0.0.1/p2p/anxious-dates", + "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234", + "/ip4/127.0.0.1/p2p/anxious-dates", + "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234", + "/ip4/127.0.0.1/p2p/anxious-dates", + "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234", + "/ip4/127.0.0.1/p2p/anxious-dates", + "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234", + "/unix/a/b/c/d/e", + "/unix/stdio", + "/ip4/1.2.3.4/tcp/80/unix/a/b/c/d/e/f", + "/ip4/127.0.0.1/p2p/anxious-dates/tcp/1234/unix/stdio" +] \ No newline at end of file diff --git a/testutil/random.go b/testutil/random.go index 93afc50c5..4d609f3bd 100644 --- a/testutil/random.go +++ b/testutil/random.go @@ -535,7 +535,7 @@ func AvailableMultiAddr(t *testing.T) multiaddr.Multiaddr { return addr } -func CreateHost(t *testing.T, addr *net.TCPAddr) host.Host { +func CreateHost(t *testing.T, addr *net.TCPAddr, opts ...libp2p.Option) host.Host { t.Helper() pkey, _, err := p2pcrypto.GenerateSecp256k1Key(crand.Reader) require.NoError(t, err) @@ -543,7 +543,11 @@ func CreateHost(t *testing.T, addr *net.TCPAddr) host.Host { addrs, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", addr.IP, addr.Port)) require.NoError(t, err) - h, err := libp2p.New(libp2p.Identity(pkey), libp2p.ListenAddrs(addrs)) + opts2 := []libp2p.Option{libp2p.Identity(pkey), libp2p.ListenAddrs(addrs)} + opts2 = append(opts2, opts...) + + h, err := libp2p.New(opts2...) + require.NoError(t, err) return h From 82e2b90adec517e41e67be4f72a8c428f21a9537 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sun, 21 Aug 2022 09:40:00 +0200 Subject: [PATCH 2/3] cleanup --- p2p/errors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/errors.go b/p2p/errors.go index a4bac25ae..f570feea4 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -25,7 +25,7 @@ import ( "github.com/obolnetwork/charon/app/errors" ) -// hasErrDialBackoff returns true if the error is contains swarm.ErrDialBackoff. +// hasErrDialBackoff returns true if the error contains swarm.ErrDialBackoff. func hasErrDialBackoff(err error) bool { dErr := new(swarm.DialError) if !errors.As(err, &dErr) { From 845b9f7c4ea3b6d80b262a6e75b1ebebb3eccbd1 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sun, 21 Aug 2022 13:10:45 +0200 Subject: [PATCH 3/3] cleanup --- p2p/errors.go | 2 +- p2p/errors_internal_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/errors.go b/p2p/errors.go index f570feea4..d8b3dd579 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -18,7 +18,7 @@ package p2p import ( "strings" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/net/swarm" ma "github.com/multiformats/go-multiaddr" diff --git a/p2p/errors_internal_test.go b/p2p/errors_internal_test.go index 9bc0ab3eb..863dfd11c 100644 --- a/p2p/errors_internal_test.go +++ b/p2p/errors_internal_test.go @@ -20,7 +20,7 @@ import ( "testing" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p/core/peerstore" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require"