From aba2bb5f5f0c57c930f7f0b5d9519bf6fdf3c724 Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Mon, 3 Jul 2023 20:21:23 +0530 Subject: [PATCH] p2p: add support for multi protocols to Send (#2387) Add support for multi protocols to `p2p.Send(...)`. category: bug ticket: #2386 --- p2p/receive_test.go | 117 +++++++++++++++++++++------ p2p/sender.go | 2 +- p2p/sender_test.go | 189 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 283 insertions(+), 25 deletions(-) create mode 100644 p2p/sender_test.go diff --git a/p2p/receive_test.go b/p2p/receive_test.go index a31da47d5..15707646f 100644 --- a/p2p/receive_test.go +++ b/p2p/receive_test.go @@ -20,45 +20,93 @@ import ( ) func TestSendReceive(t *testing.T) { + var ( + undelimID = protocol.ID("undelimited") + delimID = protocol.ID("delimited") + ) + tests := []struct { - name string - delimitedClient bool - delimitedServer bool + name string + delimitedClient bool + delimitedServer bool + clientBasicProtoID protocol.ID + serverBasicProtoID protocol.ID }{ { - name: "non-delimited client and server", - delimitedClient: false, - delimitedServer: false, + name: "non-delimited client and server", + delimitedClient: false, + delimitedServer: false, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited client and server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited client and non-delimited server", + delimitedClient: true, + delimitedServer: false, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "non-delimited client and delimited server", + delimitedClient: false, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited only client and delimited server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: delimID, + serverBasicProtoID: undelimID, }, { - name: "delimited client and server", - delimitedClient: true, - delimitedServer: true, + name: "delimited client and delimited only server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: delimID, }, { - name: "delimited client and non-delimited server", - delimitedClient: true, - delimitedServer: false, + name: "delimited only client and delimited only server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: delimID, + serverBasicProtoID: delimID, }, { - name: "non-delimited client and delimited server", - delimitedClient: false, - delimitedServer: true, + name: "delimited only client and non-delimited server, protocols not supported", + delimitedClient: true, + delimitedServer: false, + clientBasicProtoID: delimID, + serverBasicProtoID: undelimID, + }, + { + name: "non-delimited client and delimited only server, protocols not supported", + delimitedClient: false, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: delimID, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - testSendReceive(t, test.delimitedClient, test.delimitedServer) + testSendReceive(t, test.clientBasicProtoID, test.serverBasicProtoID, delimID, test.delimitedClient, test.delimitedServer) }) } } -func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { +func testSendReceive(t *testing.T, clientBasicProtoID, serverBasicProtoID, delimitedID protocol.ID, delimitedClient, delimitedServer bool) { t.Helper() var ( - pID1 = protocol.ID("undelimited") - pID2 = protocol.ID("delimited") errNegative = errors.New("negative slot") ctx = context.Background() server = testutil.CreateHost(t, testutil.AvailableAddr(t)) @@ -67,21 +115,21 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { var serverOpt []p2p.SendRecvOption if delimitedServer { - serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(pID2)) + serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(delimitedID)) } var clientOpt []p2p.SendRecvOption if delimitedClient { - clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(pID2)) + clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(delimitedID)) } client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL) // Register the server handler that either: - // - Errors is slot is negative + // - Errors if slot is negative // - Echos the duty request if slot is even // - Returns nothing is slot is odd - p2p.RegisterHandler("server", server, pID1, + p2p.RegisterHandler("server", server, serverBasicProtoID, func() proto.Message { return new(pbv1.Duty) }, func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { log.Info(ctx, "See protocol logging field") @@ -103,11 +151,32 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) { sendReceive := func(slot int64) (*pbv1.Duty, error) { resp := new(pbv1.Duty) - err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID1, clientOpt...) + err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, clientBasicProtoID, clientOpt...) return resp, err } + protocolNotSupported := func() bool { + // Client supports ONLY delimited protocol while Server supports ONLY non-delimited protocol. + if clientBasicProtoID == delimitedID && !delimitedServer { + return true + } + + // Server supports ONLY delimited protocol while Client supports ONLY non-delimited protocol. + if serverBasicProtoID == delimitedID && !delimitedClient { + return true + } + + return false + } + + if protocolNotSupported() { + _, err := sendReceive(100) + require.ErrorContains(t, err, "protocols not supported") + + return + } + t.Run("server error", func(t *testing.T) { _, err := sendReceive(-1) if delimitedClient && delimitedServer { diff --git a/p2p/sender.go b/p2p/sender.go index 845b86951..e4d1dc83b 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -253,7 +253,7 @@ func Send(ctx context.Context, tcpNode host.Host, protoID protocol.ID, peerID pe opt(&o) } // Circuit relay connections are transient - s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, protoID) + s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, o.protocols...) if err != nil { return errors.Wrap(err, "tcpNode stream") } diff --git a/p2p/sender_test.go b/p2p/sender_test.go new file mode 100644 index 000000000..1c9b93c56 --- /dev/null +++ b/p2p/sender_test.go @@ -0,0 +1,189 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package p2p_test + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/log" + pbv1 "github.com/obolnetwork/charon/core/corepb/v1" + "github.com/obolnetwork/charon/p2p" + "github.com/obolnetwork/charon/testutil" +) + +func TestSend(t *testing.T) { + var ( + undelimID = protocol.ID("undelimited") + delimID = protocol.ID("delimited") + ) + + tests := []struct { + name string + delimitedClient bool + delimitedServer bool + clientBasicProtoID protocol.ID + serverBasicProtoID protocol.ID + }{ + { + name: "non-delimited client and server", + delimitedClient: false, + delimitedServer: false, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited client and server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited client and non-delimited server", + delimitedClient: true, + delimitedServer: false, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "non-delimited client and delimited server", + delimitedClient: false, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited only client and delimited server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: delimID, + serverBasicProtoID: undelimID, + }, + { + name: "delimited client and delimited only server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: delimID, + }, + { + name: "delimited only client and delimited only server", + delimitedClient: true, + delimitedServer: true, + clientBasicProtoID: delimID, + serverBasicProtoID: delimID, + }, + { + name: "delimited only client and non-delimited server, protocols not supported", + delimitedClient: true, + delimitedServer: false, + clientBasicProtoID: delimID, + serverBasicProtoID: undelimID, + }, + { + name: "non-delimited client and delimited only server, protocols not supported", + delimitedClient: false, + delimitedServer: true, + clientBasicProtoID: undelimID, + serverBasicProtoID: delimID, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testSend(t, test.clientBasicProtoID, test.serverBasicProtoID, delimID, test.delimitedClient, test.delimitedServer) + }) + } +} + +func testSend(t *testing.T, clientBasicProtoID, serverBasicProtoID, delimitedID protocol.ID, delimitedClient, delimitedServer bool) { + t.Helper() + + var ( + errNegative = errors.New("negative slot") + ctx = context.Background() + server = testutil.CreateHost(t, testutil.AvailableAddr(t)) + client = testutil.CreateHost(t, testutil.AvailableAddr(t)) + ) + + var serverOpt []p2p.SendRecvOption + if delimitedServer { + serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(delimitedID)) + } + + var clientOpt []p2p.SendRecvOption + if delimitedClient { + clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(delimitedID)) + } + + client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL) + + // Catch server errors. + serverErrChan := make(chan error) + + // Register the server handler that either: + // - Errors if slot is negative + // - Returns nothing otherwise + p2p.RegisterHandler("server", server, serverBasicProtoID, + func() proto.Message { return new(pbv1.Duty) }, + func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) { + log.Info(ctx, "See protocol logging field") + + require.Equal(t, client.ID(), peerID) + duty, ok := req.(*pbv1.Duty) + require.True(t, ok) + + var err error + defer func() { + serverErrChan <- err + }() + + if duty.Slot < 0 { + err = errNegative + } + + return nil, false, err + }, + serverOpt..., + ) + + protocolNotSupported := func() bool { + // Client supports ONLY delimited protocol while Server supports ONLY non-delimited protocol. + if clientBasicProtoID == delimitedID && !delimitedServer { + return true + } + + // Server supports ONLY delimited protocol while Client supports ONLY non-delimited protocol. + if serverBasicProtoID == delimitedID && !delimitedClient { + return true + } + + return false + } + + if protocolNotSupported() { + err := p2p.Send(ctx, client, clientBasicProtoID, server.ID(), &pbv1.Duty{Slot: 100}, clientOpt...) + require.ErrorContains(t, err, "protocols not supported") + + return + } + + t.Run("server error", func(t *testing.T) { + err := p2p.Send(ctx, client, clientBasicProtoID, server.ID(), &pbv1.Duty{Slot: -1}, clientOpt...) + require.NoError(t, err) + require.ErrorContains(t, <-serverErrChan, "negative slot") + }) + + t.Run("ok", func(t *testing.T) { + err := p2p.Send(ctx, client, clientBasicProtoID, server.ID(), &pbv1.Duty{Slot: 100}, clientOpt...) + require.NoError(t, err) + require.NoError(t, <-serverErrChan) + }) +}