Skip to content

Commit

Permalink
p2p: add support for multi protocols to Send (#2387)
Browse files Browse the repository at this point in the history
Add support for multi protocols to `p2p.Send(...)`.

category: bug
ticket: #2386
  • Loading branch information
dB2510 authored Jul 3, 2023
1 parent 21b8265 commit aba2bb5
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 25 deletions.
117 changes: 93 additions & 24 deletions p2p/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,93 @@ import (
)

func TestSendReceive(t *testing.T) {
var (
undelimID = protocol.ID("undelimited")
delimID = protocol.ID("delimited")
)

tests := []struct {
name string
delimitedClient bool
delimitedServer bool
name string
delimitedClient bool
delimitedServer bool
clientBasicProtoID protocol.ID
serverBasicProtoID protocol.ID
}{
{
name: "non-delimited client and server",
delimitedClient: false,
delimitedServer: false,
name: "non-delimited client and server",
delimitedClient: false,
delimitedServer: false,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited client and server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited client and non-delimited server",
delimitedClient: true,
delimitedServer: false,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "non-delimited client and delimited server",
delimitedClient: false,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited only client and delimited server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: delimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited client and server",
delimitedClient: true,
delimitedServer: true,
name: "delimited client and delimited only server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: delimID,
},
{
name: "delimited client and non-delimited server",
delimitedClient: true,
delimitedServer: false,
name: "delimited only client and delimited only server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: delimID,
serverBasicProtoID: delimID,
},
{
name: "non-delimited client and delimited server",
delimitedClient: false,
delimitedServer: true,
name: "delimited only client and non-delimited server, protocols not supported",
delimitedClient: true,
delimitedServer: false,
clientBasicProtoID: delimID,
serverBasicProtoID: undelimID,
},
{
name: "non-delimited client and delimited only server, protocols not supported",
delimitedClient: false,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: delimID,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testSendReceive(t, test.delimitedClient, test.delimitedServer)
testSendReceive(t, test.clientBasicProtoID, test.serverBasicProtoID, delimID, test.delimitedClient, test.delimitedServer)
})
}
}

func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {
func testSendReceive(t *testing.T, clientBasicProtoID, serverBasicProtoID, delimitedID protocol.ID, delimitedClient, delimitedServer bool) {
t.Helper()

var (
pID1 = protocol.ID("undelimited")
pID2 = protocol.ID("delimited")
errNegative = errors.New("negative slot")
ctx = context.Background()
server = testutil.CreateHost(t, testutil.AvailableAddr(t))
Expand All @@ -67,21 +115,21 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {

var serverOpt []p2p.SendRecvOption
if delimitedServer {
serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(pID2))
serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(delimitedID))
}

var clientOpt []p2p.SendRecvOption
if delimitedClient {
clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(pID2))
clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(delimitedID))
}

client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL)

// Register the server handler that either:
// - Errors is slot is negative
// - Errors if slot is negative
// - Echos the duty request if slot is even
// - Returns nothing is slot is odd
p2p.RegisterHandler("server", server, pID1,
p2p.RegisterHandler("server", server, serverBasicProtoID,
func() proto.Message { return new(pbv1.Duty) },
func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) {
log.Info(ctx, "See protocol logging field")
Expand All @@ -103,11 +151,32 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {

sendReceive := func(slot int64) (*pbv1.Duty, error) {
resp := new(pbv1.Duty)
err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID1, clientOpt...)
err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, clientBasicProtoID, clientOpt...)

return resp, err
}

protocolNotSupported := func() bool {
// Client supports ONLY delimited protocol while Server supports ONLY non-delimited protocol.
if clientBasicProtoID == delimitedID && !delimitedServer {
return true
}

// Server supports ONLY delimited protocol while Client supports ONLY non-delimited protocol.
if serverBasicProtoID == delimitedID && !delimitedClient {
return true
}

return false
}

if protocolNotSupported() {
_, err := sendReceive(100)
require.ErrorContains(t, err, "protocols not supported")

return
}

t.Run("server error", func(t *testing.T) {
_, err := sendReceive(-1)
if delimitedClient && delimitedServer {
Expand Down
2 changes: 1 addition & 1 deletion p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func Send(ctx context.Context, tcpNode host.Host, protoID protocol.ID, peerID pe
opt(&o)
}
// Circuit relay connections are transient
s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, protoID)
s, err := tcpNode.NewStream(network.WithUseTransient(ctx, ""), peerID, o.protocols...)
if err != nil {
return errors.Wrap(err, "tcpNode stream")
}
Expand Down
189 changes: 189 additions & 0 deletions p2p/sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package p2p_test

import (
"context"
"testing"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
)

func TestSend(t *testing.T) {
var (
undelimID = protocol.ID("undelimited")
delimID = protocol.ID("delimited")
)

tests := []struct {
name string
delimitedClient bool
delimitedServer bool
clientBasicProtoID protocol.ID
serverBasicProtoID protocol.ID
}{
{
name: "non-delimited client and server",
delimitedClient: false,
delimitedServer: false,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited client and server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited client and non-delimited server",
delimitedClient: true,
delimitedServer: false,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "non-delimited client and delimited server",
delimitedClient: false,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited only client and delimited server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: delimID,
serverBasicProtoID: undelimID,
},
{
name: "delimited client and delimited only server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: delimID,
},
{
name: "delimited only client and delimited only server",
delimitedClient: true,
delimitedServer: true,
clientBasicProtoID: delimID,
serverBasicProtoID: delimID,
},
{
name: "delimited only client and non-delimited server, protocols not supported",
delimitedClient: true,
delimitedServer: false,
clientBasicProtoID: delimID,
serverBasicProtoID: undelimID,
},
{
name: "non-delimited client and delimited only server, protocols not supported",
delimitedClient: false,
delimitedServer: true,
clientBasicProtoID: undelimID,
serverBasicProtoID: delimID,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testSend(t, test.clientBasicProtoID, test.serverBasicProtoID, delimID, test.delimitedClient, test.delimitedServer)
})
}
}

func testSend(t *testing.T, clientBasicProtoID, serverBasicProtoID, delimitedID protocol.ID, delimitedClient, delimitedServer bool) {
t.Helper()

var (
errNegative = errors.New("negative slot")
ctx = context.Background()
server = testutil.CreateHost(t, testutil.AvailableAddr(t))
client = testutil.CreateHost(t, testutil.AvailableAddr(t))
)

var serverOpt []p2p.SendRecvOption
if delimitedServer {
serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(delimitedID))
}

var clientOpt []p2p.SendRecvOption
if delimitedClient {
clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(delimitedID))
}

client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL)

// Catch server errors.
serverErrChan := make(chan error)

// Register the server handler that either:
// - Errors if slot is negative
// - Returns nothing otherwise
p2p.RegisterHandler("server", server, serverBasicProtoID,
func() proto.Message { return new(pbv1.Duty) },
func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) {
log.Info(ctx, "See protocol logging field")

require.Equal(t, client.ID(), peerID)
duty, ok := req.(*pbv1.Duty)
require.True(t, ok)

var err error
defer func() {
serverErrChan <- err
}()

if duty.Slot < 0 {
err = errNegative
}

return nil, false, err
},
serverOpt...,
)

protocolNotSupported := func() bool {
// Client supports ONLY delimited protocol while Server supports ONLY non-delimited protocol.
if clientBasicProtoID == delimitedID && !delimitedServer {
return true
}

// Server supports ONLY delimited protocol while Client supports ONLY non-delimited protocol.
if serverBasicProtoID == delimitedID && !delimitedClient {
return true
}

return false
}

if protocolNotSupported() {
err := p2p.Send(ctx, client, clientBasicProtoID, server.ID(), &pbv1.Duty{Slot: 100}, clientOpt...)
require.ErrorContains(t, err, "protocols not supported")

return
}

t.Run("server error", func(t *testing.T) {
err := p2p.Send(ctx, client, clientBasicProtoID, server.ID(), &pbv1.Duty{Slot: -1}, clientOpt...)
require.NoError(t, err)
require.ErrorContains(t, <-serverErrChan, "negative slot")
})

t.Run("ok", func(t *testing.T) {
err := p2p.Send(ctx, client, clientBasicProtoID, server.ID(), &pbv1.Duty{Slot: 100}, clientOpt...)
require.NoError(t, err)
require.NoError(t, <-serverErrChan)
})
}

0 comments on commit aba2bb5

Please sign in to comment.