diff --git a/network/metrics.go b/network/metrics.go index e2a3a363b40..c6b47a1360a 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -12,11 +12,13 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/peer" "github.com/ava-labs/avalanchego/utils" - "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/set" ) type metrics struct { + // trackedSubnets does not include the primary network ID + trackedSubnets set.Set[ids.ID] + numTracked prometheus.Gauge numPeers prometheus.Gauge numSubnetPeers *prometheus.GaugeVec @@ -41,8 +43,13 @@ type metrics struct { peerConnectedStartTimesSum float64 } -func newMetrics(namespace string, registerer prometheus.Registerer, initialSubnetIDs set.Set[ids.ID]) (*metrics, error) { +func newMetrics( + namespace string, + registerer prometheus.Registerer, + trackedSubnets set.Set[ids.ID], +) (*metrics, error) { m := &metrics{ + trackedSubnets: trackedSubnets, numPeers: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: "peers", @@ -169,11 +176,7 @@ func newMetrics(namespace string, registerer prometheus.Registerer, initialSubne ) // init subnet tracker metrics with tracked subnets - for subnetID := range initialSubnetIDs { - // no need to track primary network ID - if subnetID == constants.PrimaryNetworkID { - continue - } + for subnetID := range trackedSubnets { // initialize to 0 subnetIDStr := subnetID.String() m.numSubnetPeers.WithLabelValues(subnetIDStr).Set(0) @@ -189,8 +192,10 @@ func (m *metrics) markConnected(peer peer.Peer) { m.connected.Inc() trackedSubnets := peer.TrackedSubnets() - for subnetID := range trackedSubnets { - m.numSubnetPeers.WithLabelValues(subnetID.String()).Inc() + for subnetID := range m.trackedSubnets { + if trackedSubnets.Contains(subnetID) { + m.numSubnetPeers.WithLabelValues(subnetID.String()).Inc() + } } m.lock.Lock() @@ -206,8 +211,10 @@ func (m *metrics) markDisconnected(peer peer.Peer) { m.disconnected.Inc() trackedSubnets := peer.TrackedSubnets() - for subnetID := range trackedSubnets { - m.numSubnetPeers.WithLabelValues(subnetID.String()).Dec() + for subnetID := range m.trackedSubnets { + if trackedSubnets.Contains(subnetID) { + m.numSubnetPeers.WithLabelValues(subnetID.String()).Dec() + } } m.lock.Lock() diff --git a/network/network.go b/network/network.go index a143e6202e9..9963612c016 100644 --- a/network/network.go +++ b/network/network.go @@ -460,8 +460,12 @@ func (n *network) Connected(nodeID ids.NodeID) { peerVersion := peer.Version() n.router.Connected(nodeID, peerVersion, constants.PrimaryNetworkID) - for subnetID := range peer.TrackedSubnets() { - n.router.Connected(nodeID, peerVersion, subnetID) + + trackedSubnets := peer.TrackedSubnets() + for subnetID := range n.peerConfig.MySubnets { + if trackedSubnets.Contains(subnetID) { + n.router.Connected(nodeID, peerVersion, subnetID) + } } } @@ -694,8 +698,7 @@ func (n *network) getPeers( continue } - trackedSubnets := peer.TrackedSubnets() - if subnetID != constants.PrimaryNetworkID && !trackedSubnets.Contains(subnetID) { + if trackedSubnets := peer.TrackedSubnets(); !trackedSubnets.Contains(subnetID) { continue } @@ -731,8 +734,7 @@ func (n *network) samplePeers( numValidatorsToSample+config.NonValidators+config.Peers, func(p peer.Peer) bool { // Only return peers that are tracking [subnetID] - trackedSubnets := p.TrackedSubnets() - if subnetID != constants.PrimaryNetworkID && !trackedSubnets.Contains(subnetID) { + if trackedSubnets := p.TrackedSubnets(); !trackedSubnets.Contains(subnetID) { return false } diff --git a/network/peer/config.go b/network/peer/config.go index 3eb8319216d..8aa12820cc4 100644 --- a/network/peer/config.go +++ b/network/peer/config.go @@ -33,13 +33,14 @@ type Config struct { Network Network Router router.InboundHandler VersionCompatibility version.Compatibility - MySubnets set.Set[ids.ID] - Beacons validators.Manager - Validators validators.Manager - NetworkID uint32 - PingFrequency time.Duration - PongTimeout time.Duration - MaxClockDifference time.Duration + // MySubnets does not include the primary network ID + MySubnets set.Set[ids.ID] + Beacons validators.Manager + Validators validators.Manager + NetworkID uint32 + PingFrequency time.Duration + PongTimeout time.Duration + MaxClockDifference time.Duration SupportedACPs []uint32 ObjectedACPs []uint32 diff --git a/network/peer/peer.go b/network/peer/peer.go index 4352e21a7af..a87bca70854 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -35,6 +35,9 @@ const ( // maxBloomSaltLen restricts the allowed size of the bloom salt to prevent // excessively expensive bloom filter contains checks. maxBloomSaltLen = 32 + // maxNumTrackedSubnets limits how many subnets a peer can track to prevent + // excessive memory usage. + maxNumTrackedSubnets = 16 disconnectingLog = "disconnecting from peer" failedToCreateMessageLog = "failed to create message" @@ -139,8 +142,8 @@ type peer struct { // version is the claimed version the peer is running that we received in // the Handshake message. version *version.Application - // trackedSubnets is the subset of subnetIDs the peer sent us in the Handshake - // message that we are also tracking. + // trackedSubnets are the subnetIDs the peer sent us in the Handshake + // message. The primary network ID is always included. trackedSubnets set.Set[ids.ID] // options of ACPs provided in the Handshake message. supportedACPs set.Set[uint32] @@ -271,9 +274,8 @@ func (p *peer) Info() Info { publicIPStr = p.ip.IPPort.String() } - uptimes := make(map[ids.ID]json.Uint32, p.trackedSubnets.Len()) - - for subnetID := range p.trackedSubnets { + uptimes := make(map[ids.ID]json.Uint32, p.MySubnets.Len()) + for subnetID := range p.MySubnets { uptime, exist := p.ObservedUptime(subnetID) if !exist { continue @@ -851,8 +853,12 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { primaryUptime = 0 } - subnetUptimes := make([]*p2p.SubnetUptime, 0, p.trackedSubnets.Len()) - for subnetID := range p.trackedSubnets { + subnetUptimes := make([]*p2p.SubnetUptime, 0, p.MySubnets.Len()) + for subnetID := range p.MySubnets { + if !p.trackedSubnets.Contains(subnetID) { + continue + } + subnetUptime, err := p.UptimeCalculator.CalculateUptimePercent(p.id, subnetID) if err != nil { p.Log.Debug(failedToGetUptimeLog, @@ -951,6 +957,18 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { } // handle subnet IDs + if numTrackedSubnets := len(msg.TrackedSubnets); numTrackedSubnets > maxNumTrackedSubnets { + p.Log.Debug(malformedMessageLog, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "trackedSubnets"), + zap.Int("numTrackedSubnets", numTrackedSubnets), + ) + p.StartClose() + return + } + + p.trackedSubnets.Add(constants.PrimaryNetworkID) for _, subnetIDBytes := range msg.TrackedSubnets { subnetID, err := ids.ToID(subnetIDBytes) if err != nil { @@ -963,10 +981,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { p.StartClose() return } - // add only if we also track this subnet - if p.MySubnets.Contains(subnetID) { - p.trackedSubnets.Add(subnetID) - } + p.trackedSubnets.Add(subnetID) } for _, acp := range msg.SupportedAcps { diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index 30dc817c517..ffd5915aa2c 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -39,7 +39,6 @@ type testPeer struct { type rawTestPeer struct { config *Config - conn net.Conn cert *staking.Certificate nodeID ids.NodeID inboundMsgChan <-chan message.InboundMessage @@ -60,27 +59,10 @@ func newMessageCreator(t *testing.T) message.Creator { return mc } -func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPeer, *rawTestPeer) { +func newConfig(t *testing.T) Config { t.Helper() require := require.New(t) - conn0, conn1 := net.Pipe() - - tlsCert0, err := staking.NewTLSCert() - require.NoError(err) - cert0, err := staking.ParseCertificate(tlsCert0.Leaf.Raw) - require.NoError(err) - - tlsCert1, err := staking.NewTLSCert() - require.NoError(err) - cert1, err := staking.ParseCertificate(tlsCert1.Leaf.Raw) - require.NoError(err) - - nodeID0 := ids.NodeIDFromCert(cert0) - nodeID1 := ids.NodeIDFromCert(cert1) - - mc := newMessageCreator(t) - metrics, err := NewMetrics( "", prometheus.NewRegistry(), @@ -95,14 +77,17 @@ func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPee ) require.NoError(err) - sharedConfig := Config{ + return Config{ + ReadBufferSize: constants.DefaultNetworkPeerReadBufferSize, + WriteBufferSize: constants.DefaultNetworkPeerWriteBufferSize, Metrics: metrics, - MessageCreator: mc, + MessageCreator: newMessageCreator(t), Log: logging.NoLog{}, InboundMsgThrottler: throttling.NewNoInboundThrottler(), + Network: TestNetwork, + Router: nil, VersionCompatibility: version.GetCompatibility(constants.LocalID), - MySubnets: trackedSubnets, - UptimeCalculator: uptime.NoOpCalculator, + MySubnets: nil, Beacons: validators.NewManager(), Validators: validators.NewManager(), NetworkID: constants.LocalID, @@ -110,141 +95,91 @@ func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPee PongTimeout: constants.DefaultPingPongTimeout, MaxClockDifference: time.Minute, ResourceTracker: resourceTracker, + UptimeCalculator: uptime.NoOpCalculator, + IPSigner: nil, } - peerConfig0 := sharedConfig - peerConfig1 := sharedConfig - - ip0 := ips.NewDynamicIPPort(net.IPv6loopback, 1) - tls0 := tlsCert0.PrivateKey.(crypto.Signer) - bls0, err := bls.NewSecretKey() - require.NoError(err) +} - peerConfig0.IPSigner = NewIPSigner(ip0, tls0, bls0) +func newRawTestPeer(t *testing.T, config Config) *rawTestPeer { + t.Helper() + require := require.New(t) - peerConfig0.Network = TestNetwork - inboundMsgChan0 := make(chan message.InboundMessage) - peerConfig0.Router = router.InboundHandlerFunc(func(_ context.Context, msg message.InboundMessage) { - inboundMsgChan0 <- msg - }) + tlsCert, err := staking.NewTLSCert() + require.NoError(err) + cert, err := staking.ParseCertificate(tlsCert.Leaf.Raw) + require.NoError(err) + nodeID := ids.NodeIDFromCert(cert) - ip1 := ips.NewDynamicIPPort(net.IPv6loopback, 2) - tls1 := tlsCert1.PrivateKey.(crypto.Signer) - bls1, err := bls.NewSecretKey() + ip := ips.NewDynamicIPPort(net.IPv6loopback, 1) + tls := tlsCert.PrivateKey.(crypto.Signer) + bls, err := bls.NewSecretKey() require.NoError(err) - peerConfig1.IPSigner = NewIPSigner(ip1, tls1, bls1) + config.IPSigner = NewIPSigner(ip, tls, bls) - peerConfig1.Network = TestNetwork - inboundMsgChan1 := make(chan message.InboundMessage) - peerConfig1.Router = router.InboundHandlerFunc(func(_ context.Context, msg message.InboundMessage) { - inboundMsgChan1 <- msg + inboundMsgChan := make(chan message.InboundMessage) + config.Router = router.InboundHandlerFunc(func(_ context.Context, msg message.InboundMessage) { + inboundMsgChan <- msg }) - peer0 := &rawTestPeer{ - config: &peerConfig0, - conn: conn0, - cert: cert0, - nodeID: nodeID0, - inboundMsgChan: inboundMsgChan0, - } - peer1 := &rawTestPeer{ - config: &peerConfig1, - conn: conn1, - cert: cert1, - nodeID: nodeID1, - inboundMsgChan: inboundMsgChan1, + return &rawTestPeer{ + config: &config, + cert: cert, + nodeID: nodeID, + inboundMsgChan: inboundMsgChan, } - return peer0, peer1 } -func makeTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*testPeer, *testPeer) { - rawPeer0, rawPeer1 := makeRawTestPeers(t, trackedSubnets) - - peer0 := &testPeer{ - Peer: Start( - rawPeer0.config, - rawPeer0.conn, - rawPeer1.cert, - rawPeer1.nodeID, - NewThrottledMessageQueue( - rawPeer0.config.Metrics, - rawPeer1.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ), - inboundMsgChan: rawPeer0.inboundMsgChan, - } - peer1 := &testPeer{ +func startTestPeer(self *rawTestPeer, peer *rawTestPeer, conn net.Conn) *testPeer { + return &testPeer{ Peer: Start( - rawPeer1.config, - rawPeer1.conn, - rawPeer0.cert, - rawPeer0.nodeID, + self.config, + conn, + peer.cert, + peer.nodeID, NewThrottledMessageQueue( - rawPeer1.config.Metrics, - rawPeer0.nodeID, + self.config.Metrics, + peer.nodeID, logging.NoLog{}, throttling.NewNoOutboundThrottler(), ), ), - inboundMsgChan: rawPeer1.inboundMsgChan, + inboundMsgChan: self.inboundMsgChan, } +} + +func startTestPeers(rawPeer0 *rawTestPeer, rawPeer1 *rawTestPeer) (*testPeer, *testPeer) { + conn0, conn1 := net.Pipe() + peer0 := startTestPeer(rawPeer0, rawPeer1, conn0) + peer1 := startTestPeer(rawPeer1, rawPeer0, conn1) return peer0, peer1 } -func makeReadyTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*testPeer, *testPeer) { +func awaitReady(t *testing.T, peers ...Peer) { t.Helper() require := require.New(t) - peer0, peer1 := makeTestPeers(t, trackedSubnets) - - require.NoError(peer0.AwaitReady(context.Background())) - require.True(peer0.Ready()) - - require.NoError(peer1.AwaitReady(context.Background())) - require.True(peer1.Ready()) - - return peer0, peer1 + for _, peer := range peers { + require.NoError(peer.AwaitReady(context.Background())) + require.True(peer.Ready()) + } } func TestReady(t *testing.T) { require := require.New(t) - rawPeer0, rawPeer1 := makeRawTestPeers(t, set.Set[ids.ID]{}) - peer0 := Start( - rawPeer0.config, - rawPeer0.conn, - rawPeer1.cert, - rawPeer1.nodeID, - NewThrottledMessageQueue( - rawPeer0.config.Metrics, - rawPeer1.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ) + config := newConfig(t) - require.False(peer0.Ready()) + rawPeer0 := newRawTestPeer(t, config) + rawPeer1 := newRawTestPeer(t, config) - peer1 := Start( - rawPeer1.config, - rawPeer1.conn, - rawPeer0.cert, - rawPeer0.nodeID, - NewThrottledMessageQueue( - rawPeer1.config.Metrics, - rawPeer0.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ) + conn0, conn1 := net.Pipe() - require.NoError(peer0.AwaitReady(context.Background())) - require.True(peer0.Ready()) + peer0 := startTestPeer(rawPeer0, rawPeer1, conn0) + require.False(peer0.Ready()) - require.NoError(peer1.AwaitReady(context.Background())) - require.True(peer1.Ready()) + peer1 := startTestPeer(rawPeer1, rawPeer0, conn1) + awaitReady(t, peer0, peer1) peer0.StartClose() require.NoError(peer0.AwaitClosed(context.Background())) @@ -254,10 +189,15 @@ func TestReady(t *testing.T) { func TestSend(t *testing.T) { require := require.New(t) - peer0, peer1 := makeReadyTestPeers(t, set.Set[ids.ID]{}) - mc := newMessageCreator(t) + sharedConfig := newConfig(t) - outboundGetMsg, err := mc.Get(ids.Empty, 1, time.Second, ids.Empty) + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + awaitReady(t, peer0, peer1) + + outboundGetMsg, err := sharedConfig.MessageCreator.Get(ids.Empty, 1, time.Second, ids.Empty) require.NoError(err) require.True(peer0.Send(context.Background(), outboundGetMsg)) @@ -274,9 +214,8 @@ func TestPingUptimes(t *testing.T) { trackedSubnetID := ids.GenerateTestID() untrackedSubnetID := ids.GenerateTestID() - trackedSubnets := set.Of(trackedSubnetID) - - mc := newMessageCreator(t) + sharedConfig := newConfig(t) + sharedConfig.MySubnets = set.Of(trackedSubnetID) testCases := []struct { name string @@ -287,10 +226,11 @@ func TestPingUptimes(t *testing.T) { { name: "primary network only", msg: func() message.OutboundMessage { - pingMsg, err := mc.Ping(1, nil) + pingMsg, err := sharedConfig.MessageCreator.Ping(1, nil) require.NoError(t, err) return pingMsg }(), + shouldClose: false, assertFn: func(require *require.Assertions, peer *testPeer) { uptime, ok := peer.ObservedUptime(constants.PrimaryNetworkID) require.True(ok) @@ -304,7 +244,7 @@ func TestPingUptimes(t *testing.T) { { name: "primary network and subnet", msg: func() message.OutboundMessage { - pingMsg, err := mc.Ping( + pingMsg, err := sharedConfig.MessageCreator.Ping( 1, []*p2p.SubnetUptime{ { @@ -316,6 +256,7 @@ func TestPingUptimes(t *testing.T) { require.NoError(t, err) return pingMsg }(), + shouldClose: false, assertFn: func(require *require.Assertions, peer *testPeer) { uptime, ok := peer.ObservedUptime(constants.PrimaryNetworkID) require.True(ok) @@ -329,7 +270,7 @@ func TestPingUptimes(t *testing.T) { { name: "primary network and non tracked subnet", msg: func() message.OutboundMessage { - pingMsg, err := mc.Ping( + pingMsg, err := sharedConfig.MessageCreator.Ping( 1, []*p2p.SubnetUptime{ { @@ -348,27 +289,30 @@ func TestPingUptimes(t *testing.T) { return pingMsg }(), shouldClose: true, + assertFn: nil, }, } - // Note: we reuse peers across tests because makeReadyTestPeers takes awhile - // to run. - peer0, peer1 := makeReadyTestPeers(t, trackedSubnets) - defer func() { - peer1.StartClose() - peer0.StartClose() - require.NoError(t, peer0.AwaitClosed(context.Background())) - require.NoError(t, peer1.AwaitClosed(context.Background())) - }() + // The raw peers are generated outside of the test cases to avoid generating + // many TLS keys. + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { require := require.New(t) + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + awaitReady(t, peer0, peer1) + defer func() { + peer1.StartClose() + peer0.StartClose() + require.NoError(peer0.AwaitClosed(context.Background())) + require.NoError(peer1.AwaitClosed(context.Background())) + }() + require.True(peer0.Send(context.Background(), tc.msg)) - // Note: shouldClose can only be `true` for the last test because - // we reuse peers across tests. if tc.shouldClose { require.NoError(peer1.AwaitClosed(context.Background())) return @@ -385,11 +329,85 @@ func TestPingUptimes(t *testing.T) { } } +func TestTrackedSubnets(t *testing.T) { + sharedConfig := newConfig(t) + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + + makeSubnetIDs := func(numSubnets int) []ids.ID { + subnetIDs := make([]ids.ID, numSubnets) + for i := range subnetIDs { + subnetIDs[i] = ids.GenerateTestID() + } + return subnetIDs + } + + tests := []struct { + name string + trackedSubnets []ids.ID + shouldDisconnect bool + }{ + { + name: "primary network only", + trackedSubnets: makeSubnetIDs(0), + shouldDisconnect: false, + }, + { + name: "single subnet", + trackedSubnets: makeSubnetIDs(1), + shouldDisconnect: false, + }, + { + name: "max subnets", + trackedSubnets: makeSubnetIDs(maxNumTrackedSubnets), + shouldDisconnect: false, + }, + { + name: "too many subnets", + trackedSubnets: makeSubnetIDs(maxNumTrackedSubnets + 1), + shouldDisconnect: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + rawPeer0.config.MySubnets = set.Of(test.trackedSubnets...) + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + if test.shouldDisconnect { + require.NoError(peer0.AwaitClosed(context.Background())) + require.NoError(peer1.AwaitClosed(context.Background())) + return + } + + defer func() { + peer1.StartClose() + peer0.StartClose() + require.NoError(peer0.AwaitClosed(context.Background())) + require.NoError(peer1.AwaitClosed(context.Background())) + }() + + awaitReady(t, peer0, peer1) + + require.Equal(set.Of(constants.PrimaryNetworkID), peer0.TrackedSubnets()) + + expectedTrackedSubnets := set.Of(test.trackedSubnets...) + expectedTrackedSubnets.Add(constants.PrimaryNetworkID) + require.Equal(expectedTrackedSubnets, peer1.TrackedSubnets()) + }) + } +} + // Test that a peer using the wrong BLS key is disconnected from. func TestInvalidBLSKeyDisconnects(t *testing.T) { require := require.New(t) - rawPeer0, rawPeer1 := makeRawTestPeers(t, nil) + sharedConfig := newConfig(t) + + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + require.NoError(rawPeer0.config.Validators.AddStaker( constants.PrimaryNetworkID, rawPeer1.nodeID, @@ -407,36 +425,8 @@ func TestInvalidBLSKeyDisconnects(t *testing.T) { ids.GenerateTestID(), 1, )) - peer0 := &testPeer{ - Peer: Start( - rawPeer0.config, - rawPeer0.conn, - rawPeer1.cert, - rawPeer1.nodeID, - NewThrottledMessageQueue( - rawPeer0.config.Metrics, - rawPeer1.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ), - inboundMsgChan: rawPeer0.inboundMsgChan, - } - peer1 := &testPeer{ - Peer: Start( - rawPeer1.config, - rawPeer1.conn, - rawPeer0.cert, - rawPeer0.nodeID, - NewThrottledMessageQueue( - rawPeer1.config.Metrics, - rawPeer0.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ), - inboundMsgChan: rawPeer1.inboundMsgChan, - } + + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) // Because peer1 thinks that peer0 is using the wrong BLS key, they should // disconnect from each other.