From e08467306f7750b2a3fd96dd17e42e0d19b749ed Mon Sep 17 00:00:00 2001 From: corverroos Date: Wed, 15 Mar 2023 12:05:55 +0200 Subject: [PATCH 1/3] p2p: Add support for delimited wire protocol upgrades --- go.mod | 2 +- p2p/metrics.go | 24 ++++---- p2p/receive.go | 70 +++++++++++---------- p2p/sender.go | 162 +++++++++++++++++++++++++++++++++---------------- 4 files changed, 157 insertions(+), 101 deletions(-) diff --git a/go.mod b/go.mod index f4ab9426b..93ed94dde 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/jonboulle/clockwork v0.3.0 github.com/jsternberg/zap-logfmt v1.3.0 github.com/libp2p/go-libp2p v0.25.1 + github.com/libp2p/go-msgio v0.3.0 github.com/multiformats/go-multiaddr v0.8.0 github.com/prometheus/client_golang v1.14.0 github.com/protolambda/eth2-shuffle v1.1.0 @@ -104,7 +105,6 @@ require ( github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect - github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect github.com/libp2p/go-reuseport v0.2.0 // indirect diff --git a/p2p/metrics.go b/p2p/metrics.go index 37ddf4f52..1b2730321 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -59,17 +59,19 @@ var ( Help: "Total number of libp2p connections per peer.", }, []string{"peer"}) - networkRXCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "p2p", - Name: "peer_network_receive_bytes_total", - Help: "Total number of network bytes received from the peer by protocol.", - }, []string{"peer", "protocol"}) - - networkTXCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "p2p", - Name: "peer_network_sent_bytes_total", - Help: "Total number of network bytes sent to the peer by protocol.", - }, []string{"peer", "protocol"}) + // TODO(corver): re-enable these metrics using libp2p internal features. + + // networkRXCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + // Namespace: "p2p", + // Name: "peer_network_receive_bytes_total", + // Help: "Total number of network bytes received from the peer by protocol.", + // }, []string{"peer", "protocol"}). + + // networkTXCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + // Namespace: "p2p", + // Name: "peer_network_sent_bytes_total", + // Help: "Total number of network bytes sent to the peer by protocol.", + // }, []string{"peer", "protocol"}). ) func observePing(p peer.ID, d time.Duration) { diff --git a/p2p/receive.go b/p2p/receive.go index 834bd461f..7e956c741 100644 --- a/p2p/receive.go +++ b/p2p/receive.go @@ -4,7 +4,6 @@ package p2p import ( "context" - "io" "net" "time" @@ -26,18 +25,30 @@ type HandlerFunc func(ctx context.Context, peerID peer.ID, req proto.Message) (p // RegisterHandlerFunc abstracts a function that registers a libp2p stream handler // that reads a single protobuf request and returns an optional response. type RegisterHandlerFunc func(logTopic string, tcpNode host.Host, protocol protocol.ID, - zeroReq func() proto.Message, handlerFunc HandlerFunc, + zeroReq func() proto.Message, handlerFunc HandlerFunc, opts ...SendRecvOption, ) +// Interface assertions. +var _ RegisterHandlerFunc = RegisterHandler + // RegisterHandler registers a canonical proto request and response handler for the provided protocol. // - The zeroReq function returns a zero request to unmarshal. // - The handlerFunc is called with the unmarshalled request and returns either a response or false or an error. // - The marshalled response is sent back if present. // - The stream is always closed before returning. -func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID, - zeroReq func() proto.Message, handlerFunc HandlerFunc, +func RegisterHandler(logTopic string, tcpNode host.Host, pID protocol.ID, + zeroReq func() proto.Message, handlerFunc HandlerFunc, opts ...SendRecvOption, ) { - tcpNode.SetStreamHandler(protocol, func(s network.Stream) { + o := defaultSendRecvOpts(pID) + for _, opt := range opts { + opt(&o) + } + + matchProtocol := func(pID protocol.ID) bool { + return o.readersByProtocol[pID] != nil + } + + tcpNode.SetStreamHandlerMatch(pID, matchProtocol, func(s network.Stream) { t0 := time.Now() name := PeerName(s.Conn().RemotePeer()) @@ -47,40 +58,35 @@ func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID, ctx = log.WithTopic(ctx, logTopic) ctx = log.WithCtx(ctx, z.Str("peer", name), - z.Str("protocol", string(protocol)), + z.Str("pID", string(pID)), + z.Any("protocol", s.Protocol()), ) defer cancel() defer s.Close() - b, err := io.ReadAll(s) - if IsRelayError(err) { - return // Ignore relay errors. - } else if netErr := net.Error(nil); errors.As(err, &netErr) && netErr.Timeout() { - validPB := proto.Unmarshal(b, zeroReq()) == nil - log.Error(ctx, "LibP2P read timeout", err, - z.Any("duration", time.Since(t0)), - z.I64("bytes", int64(len(b))), - z.Bool("valid_proto", validPB), - ) - + writeFunc, ok := o.writersByProtocol[s.Protocol()] + if !ok { + log.Error(ctx, "LibP2P no writer for protocol", nil) return - } else if err != nil { - log.Error(ctx, "LibP2P read request", err, - z.Any("duration", time.Since(t0)), - z.I64("bytes", int64(len(b))), - ) - + } + readFunc, ok := o.readersByProtocol[s.Protocol()] + if !ok { + log.Error(ctx, "LibP2P no reader for protocol", nil) return } req := zeroReq() - if err := proto.Unmarshal(b, req); err != nil { - log.Error(ctx, "LibP2P unmarshal request", err) + err := readFunc(s).ReadMsg(req) + if IsRelayError(err) { + return // Ignore relay errors. + } else if netErr := net.Error(nil); errors.As(err, &netErr) && netErr.Timeout() { + log.Error(ctx, "LibP2P read timeout", err, z.Any("duration", time.Since(t0))) + return + } else if err != nil { + log.Error(ctx, "LibP2P read request", err, z.Any("duration", time.Since(t0))) return } - networkRXCounter.WithLabelValues(name, string(s.Protocol())).Add(float64(len(b))) - resp, ok, err := handlerFunc(ctx, s.Conn().RemotePeer(), req) if err != nil { log.Error(ctx, "LibP2P handle stream error", err, z.Any("duration", time.Since(t0))) @@ -91,19 +97,11 @@ func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID, return } - b, err = proto.Marshal(resp) - if err != nil { - log.Error(ctx, "LibP2P marshall response", err) - return - } - - if _, err := s.Write(b); IsRelayError(err) { + if err := writeFunc(s).WriteMsg(resp); IsRelayError(err) { return // Ignore relay errors. } else if err != nil { log.Error(ctx, "LibP2P write response", err) return } - - networkTXCounter.WithLabelValues(name, string(s.Protocol())).Add(float64(len(b))) }) } diff --git a/p2p/sender.go b/p2p/sender.go index ef80eeb39..11d53a65a 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-msgio/pbio" "google.golang.org/protobuf/proto" "github.com/obolnetwork/charon/app/errors" @@ -22,15 +23,16 @@ import ( const ( senderHysteresis = 3 senderBuffer = senderHysteresis + 1 + maxMsgSize = 32 << 20 // 32MB ) // SendFunc is an abstract function responsible for sending libp2p messages. -type SendFunc func(context.Context, host.Host, protocol.ID, peer.ID, proto.Message) error +type SendFunc func(context.Context, host.Host, protocol.ID, peer.ID, proto.Message, ...SendRecvOption) error // SendReceiveFunc is an abstract function responsible for sending a libp2p request and returning // (populating) a libp2p response. type SendReceiveFunc func(ctx context.Context, tcpNode host.Host, peerID peer.ID, - req, resp proto.Message, protocol protocol.ID, opts ...func(*sendRecvOpts)) error + req, resp proto.Message, protocol protocol.ID, opts ...SendRecvOption) error var ( _ SendFunc = Send @@ -96,14 +98,15 @@ func (s *Sender) addResult(ctx context.Context, peerID peer.ID, err error) { // SendAsync returns nil and sends a libp2p message asynchronously. // It logs results on state change (success to/from failure). // It implements SendFunc. -func (s *Sender) SendAsync(parent context.Context, tcpNode host.Host, protoID protocol.ID, peerID peer.ID, msg proto.Message) error { +func (s *Sender) SendAsync(parent context.Context, tcpNode host.Host, protoID protocol.ID, peerID peer.ID, + msg proto.Message, opts ...SendRecvOption, +) error { go func() { // Clone the context since parent context may be closed soon. ctx := log.CopyFields(context.Background(), parent) - ctx = log.WithCtx(ctx, z.Str("protocol", string(protoID))) err := withRelayRetry(func() error { - return Send(ctx, tcpNode, protoID, peerID, msg) + return Send(ctx, tcpNode, protoID, peerID, msg, opts...) }) s.addResult(ctx, peerID, err) }() @@ -116,7 +119,7 @@ func (s *Sender) SendAsync(parent context.Context, tcpNode host.Host, protoID pr // It logs results on state change (success to/from failure). // It implements SendReceiveFunc. func (s *Sender) SendReceive(ctx context.Context, tcpNode host.Host, peerID peer.ID, req, resp proto.Message, - protocol protocol.ID, opts ...func(*sendRecvOpts), + protocol protocol.ID, opts ...SendRecvOption, ) error { err := withRelayRetry(func() error { return SendReceive(ctx, tcpNode, peerID, req, resp, protocol, opts...) @@ -137,9 +140,21 @@ func withRelayRetry(fn func() error) error { return err } +type SendRecvOption func(*sendRecvOpts) + type sendRecvOpts struct { - pids []protocol.ID - rttCallback func(time.Duration) + writersByProtocol map[protocol.ID]func(network.Stream) pbio.Writer + readersByProtocol map[protocol.ID]func(network.Stream) pbio.Reader + rttCallback func(time.Duration) +} + +func (o sendRecvOpts) WriteProtocols() []protocol.ID { + var protocols []protocol.ID + for p := range o.writersByProtocol { + protocols = append(protocols, p) + } + + return protocols } // WithSendReceiveRTT returns an option for SendReceive that sets a callback for the RTT. @@ -149,11 +164,24 @@ func WithSendReceiveRTT(callback func(time.Duration)) func(*sendRecvOpts) { } } -// WithSendReceiveProtocols returns an option for SendReceive that sets the protocols to use. -// Note this overrides the protocol provided in the SendReceive. -func WithSendReceiveProtocols(pids ...protocol.ID) func(*sendRecvOpts) { +// WithDelimitedProtocol returns an option that adds a length delimited read/writer for the provide protocol. +func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) { return func(opts *sendRecvOpts) { - opts.pids = pids + opts.writersByProtocol[pID] = func(s network.Stream) pbio.Writer { return pbio.NewDelimitedWriter(s) } + opts.readersByProtocol[pID] = func(s network.Stream) pbio.Reader { return pbio.NewDelimitedReader(s, maxMsgSize) } + } +} + +// defaultSendRecvOpts returns the default sendRecvOpts, it uses the legacy writers and noop rtt callback. +func defaultSendRecvOpts(pID protocol.ID) sendRecvOpts { + return sendRecvOpts{ + writersByProtocol: map[protocol.ID]func(s network.Stream) pbio.Writer{ + pID: func(s network.Stream) pbio.Writer { return legacyReadWriter{s} }, + }, + readersByProtocol: map[protocol.ID]func(s network.Stream) pbio.Reader{ + pID: func(s network.Stream) pbio.Reader { return legacyReadWriter{s} }, + }, + rttCallback: func(time.Duration) {}, } } @@ -162,85 +190,79 @@ func WithSendReceiveProtocols(pids ...protocol.ID) func(*sendRecvOpts) { // The provided response proto will be populated if err is nil. // It implements SendReceiveFunc. func SendReceive(ctx context.Context, tcpNode host.Host, peerID peer.ID, - req, resp proto.Message, pID protocol.ID, opts ...func(*sendRecvOpts), + req, resp proto.Message, pID protocol.ID, opts ...SendRecvOption, ) error { - o := sendRecvOpts{ - pids: []protocol.ID{pID}, - rttCallback: func(time.Duration) {}, - } + o := defaultSendRecvOpts(pID) for _, opt := range opts { opt(&o) } - ctx = log.WithCtx(ctx, z.Any("protocol", o.pids)) - b, err := proto.Marshal(req) + // Circuit relay connections are transient + s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, o.WriteProtocols()...) if err != nil { - return errors.Wrap(err, "marshal proto") + return errors.Wrap(err, "new stream", z.Any("protocols", o.WriteProtocols())) } - // Circuit relay connections are transient - s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, o.pids...) - if err != nil { - return errors.Wrap(err, "new stream", z.Any("protocols", o.pids)) + writeFunc, ok := o.writersByProtocol[s.Protocol()] + if !ok { + return errors.New("no writer for protocol", z.Any("protocol", s.Protocol())) + } + readFunc, ok := o.readersByProtocol[s.Protocol()] + if !ok { + return errors.New("no reader for protocol", z.Any("protocol", s.Protocol())) } + writer := writeFunc(s) + reader := readFunc(s) + t0 := time.Now() - if _, err = s.Write(b); err != nil { - return errors.Wrap(err, "write request") + if err = writer.WriteMsg(req); err != nil { + return errors.Wrap(err, "write request", z.Any("protocol", s.Protocol())) } if err := s.CloseWrite(); err != nil { - return errors.Wrap(err, "close write") + return errors.Wrap(err, "close write", z.Any("protocol", s.Protocol())) } - name := PeerName(peerID) - networkTXCounter.WithLabelValues(name, string(s.Protocol())).Add(float64(len(b))) - - b, err = io.ReadAll(s) - if err != nil { - return errors.Wrap(err, "read response") - } else if len(b) == 0 { - return errors.New("peer errored, no response") - } - - if err = proto.Unmarshal(b, resp); err != nil { - return errors.Wrap(err, "unmarshal response") + if err = reader.ReadMsg(resp); err != nil { + return errors.Wrap(err, "read response", z.Any("protocol", s.Protocol())) } if err = s.Close(); err != nil { - return errors.Wrap(err, "unmarshal response") + return errors.Wrap(err, "close stream", z.Any("protocol", s.Protocol())) } o.rttCallback(time.Since(t0)) - networkRXCounter.WithLabelValues(name, string(s.Protocol())).Add(float64(len(b))) - return nil } // Send sends a libp2p message synchronously. It implements SendFunc. -func Send(ctx context.Context, tcpNode host.Host, protoID protocol.ID, peerID peer.ID, msg proto.Message) error { - b, err := proto.Marshal(msg) - if err != nil { - return errors.Wrap(err, "marshal proto") +func Send(ctx context.Context, tcpNode host.Host, protoID protocol.ID, peerID peer.ID, msg proto.Message, + opts ...SendRecvOption, +) error { + o := defaultSendRecvOpts(protoID) + for _, opt := range opts { + opt(&o) } - // Circuit relay connections are transient s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, protoID) if err != nil { return errors.Wrap(err, "tcpNode stream") } - _, err = s.Write(b) - if err != nil { - return errors.Wrap(err, "tcpNode write") + writeFunc, ok := o.writersByProtocol[s.Protocol()] + if !ok { + return errors.New("no writer for protocol", z.Any("protocol", s.Protocol())) } - if err := s.Close(); err != nil { - return errors.Wrap(err, "tcpNode close") + if err = writeFunc(s).WriteMsg(msg); err != nil { + return errors.Wrap(err, "write message", z.Any("protocol", s.Protocol())) } - networkTXCounter.WithLabelValues(PeerName(peerID), string(s.Protocol())).Add(float64(len(b))) + if err := s.Close(); err != nil { + return errors.Wrap(err, "close stream", z.Any("protocol", s.Protocol())) + } return nil } @@ -261,3 +283,37 @@ func ProtocolSupported(tcpNode host.Host, peerID peer.ID, protocolID protocol.ID return false, true // Not supported } + +// legacyReadWriter implements pbio.Reader and pbio.Writer without length delimited encoding. +type legacyReadWriter struct { + stream network.Stream +} + +// WriteMsg writes a protobuf message to the stream. +func (w legacyReadWriter) WriteMsg(m proto.Message) error { + b, err := proto.Marshal(m) + if err != nil { + return errors.Wrap(err, "marshal proto") + } + + _, err = w.stream.Write(b) + + return err +} + +// ReadMsg reads a single protobuf message from the whole stream. +// The stream must be closed after the message was sent. +func (w legacyReadWriter) ReadMsg(m proto.Message) error { + b, err := io.ReadAll(w.stream) + if err != nil { + return errors.Wrap(err, "read response") + } else if len(b) == 0 { + return errors.New("peer errored, no response") + } + + if err = proto.Unmarshal(b, m); err != nil { + return errors.Wrap(err, "unmarshal response") + } + + return nil +} From 80768e6f2e8bf4db3bbd3e7c42314d7de47db295 Mon Sep 17 00:00:00 2001 From: corverroos Date: Wed, 15 Mar 2023 12:48:11 +0200 Subject: [PATCH 2/3] cleanup --- p2p/receive.go | 3 +- p2p/receive_test.go | 69 ++++++++++++++++++++++++++++++++++--- p2p/sender.go | 42 ++++++++++++++++------ p2p/sender_internal_test.go | 6 ++++ 4 files changed, 102 insertions(+), 18 deletions(-) diff --git a/p2p/receive.go b/p2p/receive.go index 7e956c741..99709e9ba 100644 --- a/p2p/receive.go +++ b/p2p/receive.go @@ -48,7 +48,7 @@ func RegisterHandler(logTopic string, tcpNode host.Host, pID protocol.ID, return o.readersByProtocol[pID] != nil } - tcpNode.SetStreamHandlerMatch(pID, matchProtocol, func(s network.Stream) { + tcpNode.SetStreamHandlerMatch(protocolPrefix(o.protocols...), matchProtocol, func(s network.Stream) { t0 := time.Now() name := PeerName(s.Conn().RemotePeer()) @@ -58,7 +58,6 @@ func RegisterHandler(logTopic string, tcpNode host.Host, pID protocol.ID, ctx = log.WithTopic(ctx, logTopic) ctx = log.WithCtx(ctx, z.Str("peer", name), - z.Str("pID", string(pID)), z.Any("protocol", s.Protocol()), ) defer cancel() diff --git a/p2p/receive_test.go b/p2p/receive_test.go index 0e31a6ec8..a5e5a59b0 100644 --- a/p2p/receive_test.go +++ b/p2p/receive_test.go @@ -13,29 +13,79 @@ import ( "google.golang.org/protobuf/proto" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/log" pbv1 "github.com/obolnetwork/charon/core/corepb/v1" "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/testutil" ) func TestSendReceive(t *testing.T) { + tests := []struct { + name string + delimitedClient bool + delimitedServer bool + }{ + { + name: "non-delimited client and server", + delimitedClient: false, + delimitedServer: false, + }, + { + name: "delimited client and server", + delimitedClient: true, + delimitedServer: true, + }, + { + name: "delimited client and non-delimited server", + delimitedClient: true, + delimitedServer: false, + }, + { + name: "non-delimited client and delimited server", + delimitedClient: false, + delimitedServer: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testSendReceive(t, test.delimitedClient, test.delimitedServer) + }) + } +} + +func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { + t.Helper() + var ( - protocolID = protocol.ID("test") + pID1 = protocol.ID("undelimited") + pID2 = protocol.ID("delimited") errNegative = errors.New("negative slot") ctx = context.Background() server = testutil.CreateHost(t, testutil.AvailableAddr(t)) client = testutil.CreateHost(t, testutil.AvailableAddr(t)) ) + var serverOpt []p2p.SendRecvOption + if delimitedServer { + serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(pID2)) + } + + var clientOpt []p2p.SendRecvOption + if delimitedClient { + clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(pID2)) + } + client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL) // Register the server handler that either: // - Errors is slot is negative // - Echos the duty request if slot is even // - Returns nothing is slot is odd - p2p.RegisterHandler("server", server, protocolID, + p2p.RegisterHandler("server", server, pID1, func() proto.Message { return new(pbv1.Duty) }, func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { + log.Info(ctx, "See protocol logging field") + require.Equal(t, client.ID(), peerID) duty, ok := req.(*pbv1.Duty) require.True(t, ok) @@ -48,18 +98,23 @@ func TestSendReceive(t *testing.T) { return nil, false, nil } }, + serverOpt..., ) sendReceive := func(slot int64) (*pbv1.Duty, error) { resp := new(pbv1.Duty) - err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, protocolID) + err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID1, clientOpt...) return resp, err } t.Run("server error", func(t *testing.T) { _, err := sendReceive(-1) - require.ErrorContains(t, err, "no response") + if delimitedServer && delimitedClient { + require.ErrorContains(t, err, "read response: EOF") + } else { + require.ErrorContains(t, err, "no response") + } }) t.Run("ok", func(t *testing.T) { @@ -71,6 +126,10 @@ func TestSendReceive(t *testing.T) { t.Run("empty response", func(t *testing.T) { _, err := sendReceive(101) - require.ErrorContains(t, err, "no response") + if delimitedServer && delimitedClient { + require.ErrorContains(t, err, "read response: EOF") + } else { + require.ErrorContains(t, err, "no response") + } }) } diff --git a/p2p/sender.go b/p2p/sender.go index 11d53a65a..e36643745 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -143,20 +143,12 @@ func withRelayRetry(fn func() error) error { type SendRecvOption func(*sendRecvOpts) type sendRecvOpts struct { + protocols []protocol.ID // Protocols ordered by higher priority first writersByProtocol map[protocol.ID]func(network.Stream) pbio.Writer readersByProtocol map[protocol.ID]func(network.Stream) pbio.Reader rttCallback func(time.Duration) } -func (o sendRecvOpts) WriteProtocols() []protocol.ID { - var protocols []protocol.ID - for p := range o.writersByProtocol { - protocols = append(protocols, p) - } - - return protocols -} - // WithSendReceiveRTT returns an option for SendReceive that sets a callback for the RTT. func WithSendReceiveRTT(callback func(time.Duration)) func(*sendRecvOpts) { return func(opts *sendRecvOpts) { @@ -167,6 +159,7 @@ func WithSendReceiveRTT(callback func(time.Duration)) func(*sendRecvOpts) { // WithDelimitedProtocol returns an option that adds a length delimited read/writer for the provide protocol. func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) { return func(opts *sendRecvOpts) { + opts.protocols = append([]protocol.ID{pID}, opts.protocols...) // Add to front opts.writersByProtocol[pID] = func(s network.Stream) pbio.Writer { return pbio.NewDelimitedWriter(s) } opts.readersByProtocol[pID] = func(s network.Stream) pbio.Reader { return pbio.NewDelimitedReader(s, maxMsgSize) } } @@ -175,6 +168,7 @@ func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) { // defaultSendRecvOpts returns the default sendRecvOpts, it uses the legacy writers and noop rtt callback. func defaultSendRecvOpts(pID protocol.ID) sendRecvOpts { return sendRecvOpts{ + protocols: []protocol.ID{pID}, writersByProtocol: map[protocol.ID]func(s network.Stream) pbio.Writer{ pID: func(s network.Stream) pbio.Writer { return legacyReadWriter{s} }, }, @@ -198,9 +192,9 @@ func SendReceive(ctx context.Context, tcpNode host.Host, peerID peer.ID, } // Circuit relay connections are transient - s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, o.WriteProtocols()...) + s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, o.protocols...) if err != nil { - return errors.Wrap(err, "new stream", z.Any("protocols", o.WriteProtocols())) + return errors.Wrap(err, "new stream", z.Any("protocols", o.protocols)) } writeFunc, ok := o.writersByProtocol[s.Protocol()] @@ -317,3 +311,29 @@ func (w legacyReadWriter) ReadMsg(m proto.Message) error { return nil } + +// protocolPrefix returns the common prefix of the provided protocol IDs. +func protocolPrefix(pIDs ...protocol.ID) protocol.ID { + if len(pIDs) == 0 { + return "" + } + if len(pIDs) == 1 { + return pIDs[0] + } + + prefix := pIDs[0] + for _, pID := range pIDs { + for i := 0; i < len(prefix) && i < len(pID); i++ { + if prefix[i] != pID[i] { + prefix = prefix[:i] + break + } + } + } + + if len(prefix) < len(pIDs[0]) { + prefix += "*" + } + + return prefix +} diff --git a/p2p/sender_internal_test.go b/p2p/sender_internal_test.go index 221d10cc0..620aec28c 100644 --- a/p2p/sender_internal_test.go +++ b/p2p/sender_internal_test.go @@ -97,3 +97,9 @@ func (h *testHost) NewStream(context.Context, peer.ID, ...protocol.ID) (network. return nil, network.ErrReset } + +func TestProtocolPrefix(b *testing.T) { + require.EqualValues(b, "charon/peer_info/1.0.0", protocolPrefix("charon/peer_info/1.0.0")) + require.EqualValues(b, "charon/peer_info/1.*", protocolPrefix("charon/peer_info/1.0.0", "charon/peer_info/1.1.0")) + require.EqualValues(b, "charon/peer_info/*", protocolPrefix("charon/peer_info/1.0.0", "charon/peer_info/2.0.0", "charon/peer_info/3.0.0")) +} From bdc069ebd8b457524edd5db2b6036e2c500e9905 Mon Sep 17 00:00:00 2001 From: corverroos Date: Wed, 15 Mar 2023 15:12:34 +0200 Subject: [PATCH 3/3] cleanup --- p2p/sender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/sender.go b/p2p/sender.go index e36643745..fe6753e25 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -23,7 +23,7 @@ import ( const ( senderHysteresis = 3 senderBuffer = senderHysteresis + 1 - maxMsgSize = 32 << 20 // 32MB + maxMsgSize = 128 << 20 // 128MB ) // SendFunc is an abstract function responsible for sending libp2p messages.