From 9177897c2624353cbcf14020122cee661cf15740 Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 22 Jun 2023 07:34:01 -0400 Subject: [PATCH] p2p: remove support for legacy stream delimited protocols --- app/peerinfo/adhoc.go | 4 +-- app/peerinfo/peerinfo.go | 13 +++---- core/consensus/component.go | 7 ++-- core/consensus/transport.go | 4 +-- core/parsigex/parsigex.go | 7 ++-- core/priority/prioritiser.go | 10 +++--- dkg/bcast/client.go | 4 +-- dkg/bcast/server.go | 2 -- dkg/frostp2p.go | 3 +- p2p/receive_test.go | 66 +++--------------------------------- p2p/sender.go | 57 ++----------------------------- 11 files changed, 29 insertions(+), 148 deletions(-) diff --git a/app/peerinfo/adhoc.go b/app/peerinfo/adhoc.go index 057c0d102..1d9bd1a66 100644 --- a/app/peerinfo/adhoc.go +++ b/app/peerinfo/adhoc.go @@ -24,8 +24,8 @@ func DoOnce(ctx context.Context, tcpNode host.Host, peerID peer.ID) (*pbv1.PeerI req := new(pbv1.PeerInfo) // TODO(corver): Populate request fields and make them required. resp := new(pbv1.PeerInfo) - err := p2p.SendReceive(ctx, tcpNode, peerID, req, resp, protocolID1, - p2p.WithSendReceiveRTT(rttCallback), p2p.WithDelimitedProtocol(protocolID2)) + err := p2p.SendReceive(ctx, tcpNode, peerID, req, resp, protocolID2, + p2p.WithSendReceiveRTT(rttCallback)) if err != nil { return nil, 0, false, err } diff --git a/app/peerinfo/peerinfo.go b/app/peerinfo/peerinfo.go index 4e1fda820..93e865de6 100644 --- a/app/peerinfo/peerinfo.go +++ b/app/peerinfo/peerinfo.go @@ -25,15 +25,13 @@ import ( ) const ( - period = time.Minute - - protocolID1 protocol.ID = "/charon/peerinfo/1.0.0" + period = time.Minute protocolID2 protocol.ID = "/charon/peerinfo/2.0.0" ) // Protocols returns the supported protocols of this package in order of precedence. func Protocols() []protocol.ID { - return []protocol.ID{protocolID2, protocolID1} + return []protocol.ID{protocolID2} } type ( @@ -82,7 +80,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, version version.SemVer, loc startTime := timestamppb.New(nowFunc()) // Register a simple handler that returns our info and ignores the request. - registerHandler("peerinfo", tcpNode, protocolID1, + registerHandler("peerinfo", tcpNode, protocolID2, func() proto.Message { return new(pbv1.PeerInfo) }, func(context.Context, peer.ID, proto.Message) (proto.Message, bool, error) { return &pbv1.PeerInfo{ @@ -93,7 +91,6 @@ func newInternal(tcpNode host.Host, peers []peer.ID, version version.SemVer, loc StartedAt: startTime, }, true, nil }, - p2p.WithDelimitedProtocol(protocolID2), ) // Create log filters @@ -173,8 +170,8 @@ func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) { } resp := new(pbv1.PeerInfo) - err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID1, - p2p.WithSendReceiveRTT(rttCallback), p2p.WithDelimitedProtocol(protocolID2)) + err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID2, + p2p.WithSendReceiveRTT(rttCallback)) if err != nil { return // Logging handled by send func. } else if resp.SentAt == nil || resp.StartedAt == nil { diff --git a/core/consensus/component.go b/core/consensus/component.go index c0d0814c0..6d2d841d5 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -28,13 +28,12 @@ import ( const ( recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance. - protocolID1 = "/charon/consensus/qbft/1.0.0" protocolID2 = "/charon/consensus/qbft/2.0.0" ) // Protocols returns the supported protocols of this package in order of precedence. func Protocols() []protocol.ID { - return []protocol.ID{protocolID2, protocolID1} + return []protocol.ID{protocolID2} } type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error @@ -227,9 +226,9 @@ func (c *Component) SubscribePriority(fn func(ctx context.Context, duty core.Dut // Start registers the libp2p receive handler and starts a goroutine that cleans state. This should only be called once. func (c *Component) Start(ctx context.Context) { - p2p.RegisterHandler("qbft", c.tcpNode, protocolID1, + p2p.RegisterHandler("qbft", c.tcpNode, protocolID2, func() proto.Message { return new(pbv1.ConsensusMsg) }, - c.handle, p2p.WithDelimitedProtocol(protocolID2)) + c.handle) go func() { for { diff --git a/core/consensus/transport.go b/core/consensus/transport.go index bed829b78..efcaebdc8 100644 --- a/core/consensus/transport.go +++ b/core/consensus/transport.go @@ -17,7 +17,6 @@ import ( "github.com/obolnetwork/charon/core" pbv1 "github.com/obolnetwork/charon/core/corepb/v1" "github.com/obolnetwork/charon/core/qbft" - "github.com/obolnetwork/charon/p2p" ) // transport encapsulates receiving and broadcasting for a consensus instance/duty. @@ -129,8 +128,7 @@ func (t *transport) Broadcast(ctx context.Context, typ qbft.MsgType, duty core.D continue } - err = t.component.sender.SendAsync(ctx, t.component.tcpNode, protocolID1, p.ID, msg.ToConsensusMsg(), - p2p.WithDelimitedProtocol(protocolID2)) + err = t.component.sender.SendAsync(ctx, t.component.tcpNode, protocolID2, p.ID, msg.ToConsensusMsg()) if err != nil { return err } diff --git a/core/parsigex/parsigex.go b/core/parsigex/parsigex.go index c4be18ba6..cac32738a 100644 --- a/core/parsigex/parsigex.go +++ b/core/parsigex/parsigex.go @@ -21,13 +21,12 @@ import ( ) const ( - protocolID1 = "/charon/parsigex/1.0.0" protocolID2 = "/charon/parsigex/2.0.0" ) // Protocols returns the supported protocols of this package in order of precedence. func Protocols() []protocol.ID { - return []protocol.ID{protocolID2, protocolID1} + return []protocol.ID{protocolID2} } func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID, verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error) *ParSigEx { @@ -40,7 +39,7 @@ func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers [] } newReq := func() proto.Message { return new(pbv1.ParSigExMsg) } - p2p.RegisterHandler("parsigex", tcpNode, protocolID1, newReq, parSigEx.handle, p2p.WithDelimitedProtocol(protocolID2)) + p2p.RegisterHandler("parsigex", tcpNode, protocolID2, newReq, parSigEx.handle) return parSigEx } @@ -115,7 +114,7 @@ func (m *ParSigEx) Broadcast(ctx context.Context, duty core.Duty, set core.ParSi continue } - if err := m.sendFunc(ctx, m.tcpNode, protocolID1, p, &msg, p2p.WithDelimitedProtocol(protocolID2)); err != nil { + if err := m.sendFunc(ctx, m.tcpNode, protocolID2, p, &msg); err != nil { return err } } diff --git a/core/priority/prioritiser.go b/core/priority/prioritiser.go index 180b24295..934646b67 100644 --- a/core/priority/prioritiser.go +++ b/core/priority/prioritiser.go @@ -37,13 +37,12 @@ import ( ) const ( - protocolID1 = "charon/priority/1.1.0" protocolID2 = "charon/priority/2.0.0" ) // Protocols returns the supported protocols of this package in order of precedence. func Protocols() []protocol.ID { - return []protocol.ID{protocolID2, protocolID1} + return []protocol.ID{protocolID2} } // Topic groups priorities in an instance. @@ -118,7 +117,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int, }) // Register prioritiser protocol handler. - registerHandlerFunc("priority", tcpNode, protocolID1, + registerHandlerFunc("priority", tcpNode, protocolID2, func() proto.Message { return new(pbv1.PriorityMsg) }, func(ctx context.Context, pID peer.ID, msg proto.Message) (proto.Message, bool, error) { prioMsg, ok := msg.(*pbv1.PriorityMsg) @@ -133,8 +132,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int, } return resp, true, nil - }, - p2p.WithDelimitedProtocol(protocolID2)) + }) return p } @@ -333,7 +331,7 @@ func exchange(ctx context.Context, tcpNode host.Host, peers []peer.ID, msgValida go func(pID peer.ID) { response := new(pbv1.PriorityMsg) - err := sendFunc(ctx, tcpNode, pID, own, response, protocolID1, p2p.WithDelimitedProtocol(protocolID2)) + err := sendFunc(ctx, tcpNode, pID, own, response, protocolID2) if err != nil { // No need to log, since transport will do it. return diff --git a/dkg/bcast/client.go b/dkg/bcast/client.go index 853648995..073337ac5 100644 --- a/dkg/bcast/client.go +++ b/dkg/bcast/client.go @@ -66,7 +66,7 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message) fork, join, cancel := forkjoin.New(ctx, func(ctx context.Context, pID peer.ID) (*pb.BCastSigResponse, error) { sigResp := new(pb.BCastSigResponse) - err := c.sendRecvFunc(ctx, c.tcpNode, pID, sigReq, sigResp, protocolIDSig, p2p.WithDelimitedProtocol(protocolIDSig)) + err := c.sendRecvFunc(ctx, c.tcpNode, pID, sigReq, sigResp, protocolIDSig) return sigResp, err }) @@ -133,7 +133,7 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message) continue // Skip self. } - err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg, p2p.WithDelimitedProtocol(protocolIDMsg)) + err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg) if err != nil { return errors.Wrap(err, "send message") } diff --git a/dkg/bcast/server.go b/dkg/bcast/server.go index a7492f204..4f16ceaf3 100644 --- a/dkg/bcast/server.go +++ b/dkg/bcast/server.go @@ -29,13 +29,11 @@ func newServer(tcpNode host.Host, signFunc signFunc, verifyFunc verifyFunc) *ser p2p.RegisterHandler("bcast", tcpNode, protocolIDSig, func() proto.Message { return new(pb.BCastSigRequest) }, s.handleSigRequest, - p2p.WithDelimitedProtocol(protocolIDSig), ) p2p.RegisterHandler("bcast", tcpNode, protocolIDMsg, func() proto.Message { return new(pb.BCastMessage) }, s.handleMessage, - p2p.WithDelimitedProtocol(protocolIDMsg), ) return s diff --git a/dkg/frostp2p.go b/dkg/frostp2p.go index 4f56a2035..60e74a416 100644 --- a/dkg/frostp2p.go +++ b/dkg/frostp2p.go @@ -55,7 +55,6 @@ func newFrostP2P(tcpNode host.Host, peers map[peer.ID]cluster.NodeIdx, bcastComp p2p.RegisterHandler("frost", tcpNode, round1P2PID, func() proto.Message { return new(pb.FrostRound1P2P) }, newP2PCallback(tcpNode, peers, round1P2PRecv, numVals), - p2p.WithDelimitedProtocol(round1P2PID), ) bcastCallback := newBcastCallback(peers, round1CastsRecv, round2CastsRecv, threshold, numVals) @@ -238,7 +237,7 @@ func (f *frostP2P) Round1(ctx context.Context, castR1 map[msgKey]frost.Round1Bca return nil, nil, errors.New("bug: unexpected p2p message to self") } - err := p2p.Send(ctx, f.tcpNode, round1P2PID, pID, p2pMsg, p2p.WithDelimitedProtocol(round1P2PID)) + err := p2p.Send(ctx, f.tcpNode, round1P2PID, pID, p2pMsg) if err != nil { return nil, nil, err } diff --git a/p2p/receive_test.go b/p2p/receive_test.go index a31da47d5..9649483fb 100644 --- a/p2p/receive_test.go +++ b/p2p/receive_test.go @@ -20,68 +20,21 @@ import ( ) func TestSendReceive(t *testing.T) { - tests := []struct { - name string - delimitedClient bool - delimitedServer bool - }{ - { - name: "non-delimited client and server", - delimitedClient: false, - delimitedServer: false, - }, - { - name: "delimited client and server", - delimitedClient: true, - delimitedServer: true, - }, - { - name: "delimited client and non-delimited server", - delimitedClient: true, - delimitedServer: false, - }, - { - name: "non-delimited client and delimited server", - delimitedClient: false, - delimitedServer: true, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testSendReceive(t, test.delimitedClient, test.delimitedServer) - }) - } -} - -func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { - t.Helper() - var ( - pID1 = protocol.ID("undelimited") - pID2 = protocol.ID("delimited") + pID = protocol.ID("delimited") errNegative = errors.New("negative slot") ctx = context.Background() server = testutil.CreateHost(t, testutil.AvailableAddr(t)) client = testutil.CreateHost(t, testutil.AvailableAddr(t)) ) - var serverOpt []p2p.SendRecvOption - if delimitedServer { - serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(pID2)) - } - - var clientOpt []p2p.SendRecvOption - if delimitedClient { - clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(pID2)) - } - client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL) // Register the server handler that either: // - Errors is slot is negative // - Echos the duty request if slot is even // - Returns nothing is slot is odd - p2p.RegisterHandler("server", server, pID1, + p2p.RegisterHandler("server", server, pID, func() proto.Message { return new(pbv1.Duty) }, func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { log.Info(ctx, "See protocol logging field") @@ -98,23 +51,18 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { return nil, false, nil } }, - serverOpt..., ) sendReceive := func(slot int64) (*pbv1.Duty, error) { resp := new(pbv1.Duty) - err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID1, clientOpt...) + err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID) return resp, err } t.Run("server error", func(t *testing.T) { _, err := sendReceive(-1) - if delimitedClient && delimitedServer { - require.ErrorContains(t, err, "read response: EOF") - } else { - require.ErrorContains(t, err, "no or zero response received") - } + require.ErrorContains(t, err, "read response: EOF") }) t.Run("ok", func(t *testing.T) { @@ -126,10 +74,6 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { t.Run("empty response", func(t *testing.T) { _, err := sendReceive(101) - if delimitedClient && delimitedServer { - require.ErrorContains(t, err, "read response: EOF") - } else { - require.ErrorContains(t, err, "no or zero response received") - } + require.ErrorContains(t, err, "read response: EOF") }) } diff --git a/p2p/sender.go b/p2p/sender.go index 845b86951..16363346e 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -4,7 +4,6 @@ package p2p import ( "context" - "io" "sync" "time" @@ -156,24 +155,15 @@ func WithSendReceiveRTT(callback func(time.Duration)) func(*sendRecvOpts) { } } -// WithDelimitedProtocol returns an option that adds a length delimited read/writer for the provide protocol. -func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) { - return func(opts *sendRecvOpts) { - opts.protocols = append([]protocol.ID{pID}, opts.protocols...) // Add to front - opts.writersByProtocol[pID] = func(s network.Stream) pbio.Writer { return pbio.NewDelimitedWriter(s) } - opts.readersByProtocol[pID] = func(s network.Stream) pbio.Reader { return pbio.NewDelimitedReader(s, maxMsgSize) } - } -} - -// defaultSendRecvOpts returns the default sendRecvOpts, it uses the legacy writers and noop rtt callback. +// defaultSendRecvOpts returns the default sendRecvOpts, it uses the delimited read-writers and noop rtt callback. func defaultSendRecvOpts(pID protocol.ID) sendRecvOpts { return sendRecvOpts{ protocols: []protocol.ID{pID}, writersByProtocol: map[protocol.ID]func(s network.Stream) pbio.Writer{ - pID: func(s network.Stream) pbio.Writer { return legacyReadWriter{s} }, + pID: func(s network.Stream) pbio.Writer { return pbio.NewDelimitedWriter(s) }, }, readersByProtocol: map[protocol.ID]func(s network.Stream) pbio.Reader{ - pID: func(s network.Stream) pbio.Reader { return legacyReadWriter{s} }, + pID: func(s network.Stream) pbio.Reader { return pbio.NewDelimitedReader(s, maxMsgSize) }, }, rttCallback: func(time.Duration) {}, } @@ -222,19 +212,10 @@ func SendReceive(ctx context.Context, tcpNode host.Host, peerID peer.ID, return errors.Wrap(err, "close write", z.Any("protocol", s.Protocol())) } - zeroResp := proto.Clone(resp) - if err = reader.ReadMsg(resp); err != nil { return errors.Wrap(err, "read response", z.Any("protocol", s.Protocol())) } - // TODO(corver): Remove this once we only use length-delimited protocols. - // This was added since legacy stream delimited readers couldn't distinguish between - // no response and a zero response. - if proto.Equal(resp, zeroResp) { - return errors.New("no or zero response received", z.Any("protocol", s.Protocol())) - } - if err = s.Close(); err != nil { return errors.Wrap(err, "close stream", z.Any("protocol", s.Protocol())) } @@ -274,38 +255,6 @@ func Send(ctx context.Context, tcpNode host.Host, protoID protocol.ID, peerID pe return nil } -// legacyReadWriter implements pbio.Reader and pbio.Writer without length delimited encoding. -type legacyReadWriter struct { - stream network.Stream -} - -// WriteMsg writes a protobuf message to the stream. -func (w legacyReadWriter) WriteMsg(m proto.Message) error { - b, err := proto.Marshal(m) - if err != nil { - return errors.Wrap(err, "marshal proto") - } - - _, err = w.stream.Write(b) - - return err -} - -// ReadMsg reads a single protobuf message from the whole stream. -// The stream must be closed after the message was sent. -func (w legacyReadWriter) ReadMsg(m proto.Message) error { - b, err := io.ReadAll(w.stream) - if err != nil { - return errors.Wrap(err, "read proto") - } - - if err = proto.Unmarshal(b, m); err != nil { - return errors.Wrap(err, "unmarshal proto") - } - - return nil -} - // protocolPrefix returns the common prefix of the provided protocol IDs. func protocolPrefix(pIDs ...protocol.ID) protocol.ID { if len(pIDs) == 0 {