Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: add support for delimited wire protocol upgrades #1885

Merged
merged 3 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/jonboulle/clockwork v0.3.0
github.com/jsternberg/zap-logfmt v1.3.0
github.com/libp2p/go-libp2p v0.25.1
github.com/libp2p/go-msgio v0.3.0
github.com/multiformats/go-multiaddr v0.8.0
github.com/prometheus/client_golang v1.14.0
github.com/protolambda/eth2-shuffle v1.1.0
Expand Down Expand Up @@ -104,7 +105,6 @@ require (
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
Expand Down
24 changes: 13 additions & 11 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,19 @@ var (
Help: "Total number of libp2p connections per peer.",
}, []string{"peer"})

networkRXCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "p2p",
Name: "peer_network_receive_bytes_total",
Help: "Total number of network bytes received from the peer by protocol.",
}, []string{"peer", "protocol"})

networkTXCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "p2p",
Name: "peer_network_sent_bytes_total",
Help: "Total number of network bytes sent to the peer by protocol.",
}, []string{"peer", "protocol"})
// TODO(corver): re-enable these metrics using libp2p internal features.

// networkRXCounter = promauto.NewCounterVec(prometheus.CounterOpts{
// Namespace: "p2p",
// Name: "peer_network_receive_bytes_total",
// Help: "Total number of network bytes received from the peer by protocol.",
// }, []string{"peer", "protocol"}).

// networkTXCounter = promauto.NewCounterVec(prometheus.CounterOpts{
// Namespace: "p2p",
// Name: "peer_network_sent_bytes_total",
// Help: "Total number of network bytes sent to the peer by protocol.",
// }, []string{"peer", "protocol"}).
)

func observePing(p peer.ID, d time.Duration) {
Expand Down
69 changes: 33 additions & 36 deletions p2p/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package p2p

import (
"context"
"io"
"net"
"time"

Expand All @@ -26,18 +25,30 @@ type HandlerFunc func(ctx context.Context, peerID peer.ID, req proto.Message) (p
// RegisterHandlerFunc abstracts a function that registers a libp2p stream handler
// that reads a single protobuf request and returns an optional response.
type RegisterHandlerFunc func(logTopic string, tcpNode host.Host, protocol protocol.ID,
zeroReq func() proto.Message, handlerFunc HandlerFunc,
zeroReq func() proto.Message, handlerFunc HandlerFunc, opts ...SendRecvOption,
)

// Interface assertions.
var _ RegisterHandlerFunc = RegisterHandler

// RegisterHandler registers a canonical proto request and response handler for the provided protocol.
// - The zeroReq function returns a zero request to unmarshal.
// - The handlerFunc is called with the unmarshalled request and returns either a response or false or an error.
// - The marshalled response is sent back if present.
// - The stream is always closed before returning.
func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID,
zeroReq func() proto.Message, handlerFunc HandlerFunc,
func RegisterHandler(logTopic string, tcpNode host.Host, pID protocol.ID,
zeroReq func() proto.Message, handlerFunc HandlerFunc, opts ...SendRecvOption,
) {
tcpNode.SetStreamHandler(protocol, func(s network.Stream) {
o := defaultSendRecvOpts(pID)
for _, opt := range opts {
opt(&o)
}

matchProtocol := func(pID protocol.ID) bool {
return o.readersByProtocol[pID] != nil
}

tcpNode.SetStreamHandlerMatch(protocolPrefix(o.protocols...), matchProtocol, func(s network.Stream) {
t0 := time.Now()
name := PeerName(s.Conn().RemotePeer())

Expand All @@ -47,40 +58,34 @@ func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID,
ctx = log.WithTopic(ctx, logTopic)
ctx = log.WithCtx(ctx,
z.Str("peer", name),
z.Str("protocol", string(protocol)),
z.Any("protocol", s.Protocol()),
)
defer cancel()
defer s.Close()

b, err := io.ReadAll(s)
if IsRelayError(err) {
return // Ignore relay errors.
} else if netErr := net.Error(nil); errors.As(err, &netErr) && netErr.Timeout() {
validPB := proto.Unmarshal(b, zeroReq()) == nil
log.Error(ctx, "LibP2P read timeout", err,
z.Any("duration", time.Since(t0)),
z.I64("bytes", int64(len(b))),
z.Bool("valid_proto", validPB),
)

writeFunc, ok := o.writersByProtocol[s.Protocol()]
if !ok {
log.Error(ctx, "LibP2P no writer for protocol", nil)
return
} else if err != nil {
log.Error(ctx, "LibP2P read request", err,
z.Any("duration", time.Since(t0)),
z.I64("bytes", int64(len(b))),
)

}
readFunc, ok := o.readersByProtocol[s.Protocol()]
if !ok {
log.Error(ctx, "LibP2P no reader for protocol", nil)
return
}

req := zeroReq()
if err := proto.Unmarshal(b, req); err != nil {
log.Error(ctx, "LibP2P unmarshal request", err)
err := readFunc(s).ReadMsg(req)
if IsRelayError(err) {
return // Ignore relay errors.
} else if netErr := net.Error(nil); errors.As(err, &netErr) && netErr.Timeout() {
log.Error(ctx, "LibP2P read timeout", err, z.Any("duration", time.Since(t0)))
return
} else if err != nil {
log.Error(ctx, "LibP2P read request", err, z.Any("duration", time.Since(t0)))
return
}

networkRXCounter.WithLabelValues(name, string(s.Protocol())).Add(float64(len(b)))

resp, ok, err := handlerFunc(ctx, s.Conn().RemotePeer(), req)
if err != nil {
log.Error(ctx, "LibP2P handle stream error", err, z.Any("duration", time.Since(t0)))
Expand All @@ -91,19 +96,11 @@ func RegisterHandler(logTopic string, tcpNode host.Host, protocol protocol.ID,
return
}

b, err = proto.Marshal(resp)
if err != nil {
log.Error(ctx, "LibP2P marshall response", err)
return
}

if _, err := s.Write(b); IsRelayError(err) {
if err := writeFunc(s).WriteMsg(resp); IsRelayError(err) {
return // Ignore relay errors.
} else if err != nil {
log.Error(ctx, "LibP2P write response", err)
return
}

networkTXCounter.WithLabelValues(name, string(s.Protocol())).Add(float64(len(b)))
})
}
69 changes: 64 additions & 5 deletions p2p/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,79 @@ import (
"google.golang.org/protobuf/proto"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
)

func TestSendReceive(t *testing.T) {
tests := []struct {
name string
delimitedClient bool
delimitedServer bool
}{
{
name: "non-delimited client and server",
delimitedClient: false,
delimitedServer: false,
},
{
name: "delimited client and server",
delimitedClient: true,
delimitedServer: true,
},
{
name: "delimited client and non-delimited server",
delimitedClient: true,
delimitedServer: false,
},
{
name: "non-delimited client and delimited server",
delimitedClient: false,
delimitedServer: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testSendReceive(t, test.delimitedClient, test.delimitedServer)
})
}
}

func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {
t.Helper()

var (
protocolID = protocol.ID("test")
pID1 = protocol.ID("undelimited")
pID2 = protocol.ID("delimited")
errNegative = errors.New("negative slot")
ctx = context.Background()
server = testutil.CreateHost(t, testutil.AvailableAddr(t))
client = testutil.CreateHost(t, testutil.AvailableAddr(t))
)

var serverOpt []p2p.SendRecvOption
if delimitedServer {
serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(pID2))
}

var clientOpt []p2p.SendRecvOption
if delimitedClient {
clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(pID2))
}

client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL)

// Register the server handler that either:
// - Errors is slot is negative
// - Echos the duty request if slot is even
// - Returns nothing is slot is odd
p2p.RegisterHandler("server", server, protocolID,
p2p.RegisterHandler("server", server, pID1,
func() proto.Message { return new(pbv1.Duty) },
func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) {
log.Info(ctx, "See protocol logging field")

require.Equal(t, client.ID(), peerID)
duty, ok := req.(*pbv1.Duty)
require.True(t, ok)
Expand All @@ -48,18 +98,23 @@ func TestSendReceive(t *testing.T) {
return nil, false, nil
}
},
serverOpt...,
)

sendReceive := func(slot int64) (*pbv1.Duty, error) {
resp := new(pbv1.Duty)
err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, protocolID)
err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID1, clientOpt...)

return resp, err
}

t.Run("server error", func(t *testing.T) {
_, err := sendReceive(-1)
require.ErrorContains(t, err, "no response")
if delimitedServer && delimitedClient {
require.ErrorContains(t, err, "read response: EOF")
} else {
require.ErrorContains(t, err, "no response")
}
})

t.Run("ok", func(t *testing.T) {
Expand All @@ -71,6 +126,10 @@ func TestSendReceive(t *testing.T) {

t.Run("empty response", func(t *testing.T) {
_, err := sendReceive(101)
require.ErrorContains(t, err, "no response")
if delimitedServer && delimitedClient {
require.ErrorContains(t, err, "read response: EOF")
} else {
require.ErrorContains(t, err, "no response")
}
})
}
Loading