Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: remove support for legacy stream delimited protocols #2350

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/peerinfo/adhoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func DoOnce(ctx context.Context, tcpNode host.Host, peerID peer.ID) (*pbv1.PeerI

req := new(pbv1.PeerInfo) // TODO(corver): Populate request fields and make them required.
resp := new(pbv1.PeerInfo)
err := p2p.SendReceive(ctx, tcpNode, peerID, req, resp, protocolID1,
p2p.WithSendReceiveRTT(rttCallback), p2p.WithDelimitedProtocol(protocolID2))
err := p2p.SendReceive(ctx, tcpNode, peerID, req, resp, protocolID2,
p2p.WithSendReceiveRTT(rttCallback))
if err != nil {
return nil, 0, false, err
}
Expand Down
13 changes: 5 additions & 8 deletions app/peerinfo/peerinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ import (
)

const (
period = time.Minute

protocolID1 protocol.ID = "/charon/peerinfo/1.0.0"
period = time.Minute
protocolID2 protocol.ID = "/charon/peerinfo/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

type (
Expand Down Expand Up @@ -82,7 +80,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, version version.SemVer, loc
startTime := timestamppb.New(nowFunc())

// Register a simple handler that returns our info and ignores the request.
registerHandler("peerinfo", tcpNode, protocolID1,
registerHandler("peerinfo", tcpNode, protocolID2,
func() proto.Message { return new(pbv1.PeerInfo) },
func(context.Context, peer.ID, proto.Message) (proto.Message, bool, error) {
return &pbv1.PeerInfo{
Expand All @@ -93,7 +91,6 @@ func newInternal(tcpNode host.Host, peers []peer.ID, version version.SemVer, loc
StartedAt: startTime,
}, true, nil
},
p2p.WithDelimitedProtocol(protocolID2),
)

// Create log filters
Expand Down Expand Up @@ -173,8 +170,8 @@ func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) {
}

resp := new(pbv1.PeerInfo)
err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID1,
p2p.WithSendReceiveRTT(rttCallback), p2p.WithDelimitedProtocol(protocolID2))
err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID2,
p2p.WithSendReceiveRTT(rttCallback))
if err != nil {
return // Logging handled by send func.
} else if resp.SentAt == nil || resp.StartedAt == nil {
Expand Down
7 changes: 3 additions & 4 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (

const (
recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance.
protocolID1 = "/charon/consensus/qbft/1.0.0"
protocolID2 = "/charon/consensus/qbft/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error
Expand Down Expand Up @@ -227,9 +226,9 @@ func (c *Component) SubscribePriority(fn func(ctx context.Context, duty core.Dut

// Start registers the libp2p receive handler and starts a goroutine that cleans state. This should only be called once.
func (c *Component) Start(ctx context.Context) {
p2p.RegisterHandler("qbft", c.tcpNode, protocolID1,
p2p.RegisterHandler("qbft", c.tcpNode, protocolID2,
func() proto.Message { return new(pbv1.ConsensusMsg) },
c.handle, p2p.WithDelimitedProtocol(protocolID2))
c.handle)

go func() {
for {
Expand Down
4 changes: 1 addition & 3 deletions core/consensus/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/obolnetwork/charon/core"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/core/qbft"
"github.com/obolnetwork/charon/p2p"
)

// transport encapsulates receiving and broadcasting for a consensus instance/duty.
Expand Down Expand Up @@ -129,8 +128,7 @@ func (t *transport) Broadcast(ctx context.Context, typ qbft.MsgType, duty core.D
continue
}

err = t.component.sender.SendAsync(ctx, t.component.tcpNode, protocolID1, p.ID, msg.ToConsensusMsg(),
p2p.WithDelimitedProtocol(protocolID2))
err = t.component.sender.SendAsync(ctx, t.component.tcpNode, protocolID2, p.ID, msg.ToConsensusMsg())
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions core/parsigex/parsigex.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
)

const (
protocolID1 = "/charon/parsigex/1.0.0"
protocolID2 = "/charon/parsigex/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID, verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error) *ParSigEx {
Expand All @@ -40,7 +39,7 @@ func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []
}

newReq := func() proto.Message { return new(pbv1.ParSigExMsg) }
p2p.RegisterHandler("parsigex", tcpNode, protocolID1, newReq, parSigEx.handle, p2p.WithDelimitedProtocol(protocolID2))
p2p.RegisterHandler("parsigex", tcpNode, protocolID2, newReq, parSigEx.handle)

return parSigEx
}
Expand Down Expand Up @@ -115,7 +114,7 @@ func (m *ParSigEx) Broadcast(ctx context.Context, duty core.Duty, set core.ParSi
continue
}

if err := m.sendFunc(ctx, m.tcpNode, protocolID1, p, &msg, p2p.WithDelimitedProtocol(protocolID2)); err != nil {
if err := m.sendFunc(ctx, m.tcpNode, protocolID2, p, &msg); err != nil {
return err
}
}
Expand Down
10 changes: 4 additions & 6 deletions core/priority/prioritiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ import (
)

const (
protocolID1 = "charon/priority/1.1.0"
protocolID2 = "charon/priority/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

// Topic groups priorities in an instance.
Expand Down Expand Up @@ -118,7 +117,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int,
})

// Register prioritiser protocol handler.
registerHandlerFunc("priority", tcpNode, protocolID1,
registerHandlerFunc("priority", tcpNode, protocolID2,
func() proto.Message { return new(pbv1.PriorityMsg) },
func(ctx context.Context, pID peer.ID, msg proto.Message) (proto.Message, bool, error) {
prioMsg, ok := msg.(*pbv1.PriorityMsg)
Expand All @@ -133,8 +132,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int,
}

return resp, true, nil
},
p2p.WithDelimitedProtocol(protocolID2))
})

return p
}
Expand Down Expand Up @@ -333,7 +331,7 @@ func exchange(ctx context.Context, tcpNode host.Host, peers []peer.ID, msgValida

go func(pID peer.ID) {
response := new(pbv1.PriorityMsg)
err := sendFunc(ctx, tcpNode, pID, own, response, protocolID1, p2p.WithDelimitedProtocol(protocolID2))
err := sendFunc(ctx, tcpNode, pID, own, response, protocolID2)
if err != nil {
// No need to log, since transport will do it.
return
Expand Down
4 changes: 2 additions & 2 deletions dkg/bcast/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message)

fork, join, cancel := forkjoin.New(ctx, func(ctx context.Context, pID peer.ID) (*pb.BCastSigResponse, error) {
sigResp := new(pb.BCastSigResponse)
err := c.sendRecvFunc(ctx, c.tcpNode, pID, sigReq, sigResp, protocolIDSig, p2p.WithDelimitedProtocol(protocolIDSig))
err := c.sendRecvFunc(ctx, c.tcpNode, pID, sigReq, sigResp, protocolIDSig)

return sigResp, err
})
Expand Down Expand Up @@ -133,7 +133,7 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message)
continue // Skip self.
}

err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg, p2p.WithDelimitedProtocol(protocolIDMsg))
err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg)
if err != nil {
return errors.Wrap(err, "send message")
}
Expand Down
2 changes: 0 additions & 2 deletions dkg/bcast/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ func newServer(tcpNode host.Host, signFunc signFunc, verifyFunc verifyFunc) *ser
p2p.RegisterHandler("bcast", tcpNode, protocolIDSig,
func() proto.Message { return new(pb.BCastSigRequest) },
s.handleSigRequest,
p2p.WithDelimitedProtocol(protocolIDSig),
)

p2p.RegisterHandler("bcast", tcpNode, protocolIDMsg,
func() proto.Message { return new(pb.BCastMessage) },
s.handleMessage,
p2p.WithDelimitedProtocol(protocolIDMsg),
)

return s
Expand Down
3 changes: 1 addition & 2 deletions dkg/frostp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func newFrostP2P(tcpNode host.Host, peers map[peer.ID]cluster.NodeIdx, bcastComp
p2p.RegisterHandler("frost", tcpNode, round1P2PID,
func() proto.Message { return new(pb.FrostRound1P2P) },
newP2PCallback(tcpNode, peers, round1P2PRecv, numVals),
p2p.WithDelimitedProtocol(round1P2PID),
)

bcastCallback := newBcastCallback(peers, round1CastsRecv, round2CastsRecv, threshold, numVals)
Expand Down Expand Up @@ -238,7 +237,7 @@ func (f *frostP2P) Round1(ctx context.Context, castR1 map[msgKey]frost.Round1Bca
return nil, nil, errors.New("bug: unexpected p2p message to self")
}

err := p2p.Send(ctx, f.tcpNode, round1P2PID, pID, p2pMsg, p2p.WithDelimitedProtocol(round1P2PID))
err := p2p.Send(ctx, f.tcpNode, round1P2PID, pID, p2pMsg)
if err != nil {
return nil, nil, err
}
Expand Down
66 changes: 5 additions & 61 deletions p2p/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,68 +20,21 @@ import (
)

func TestSendReceive(t *testing.T) {
tests := []struct {
name string
delimitedClient bool
delimitedServer bool
}{
{
name: "non-delimited client and server",
delimitedClient: false,
delimitedServer: false,
},
{
name: "delimited client and server",
delimitedClient: true,
delimitedServer: true,
},
{
name: "delimited client and non-delimited server",
delimitedClient: true,
delimitedServer: false,
},
{
name: "non-delimited client and delimited server",
delimitedClient: false,
delimitedServer: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testSendReceive(t, test.delimitedClient, test.delimitedServer)
})
}
}

func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {
t.Helper()

var (
pID1 = protocol.ID("undelimited")
pID2 = protocol.ID("delimited")
pID = protocol.ID("delimited")
errNegative = errors.New("negative slot")
ctx = context.Background()
server = testutil.CreateHost(t, testutil.AvailableAddr(t))
client = testutil.CreateHost(t, testutil.AvailableAddr(t))
)

var serverOpt []p2p.SendRecvOption
if delimitedServer {
serverOpt = append(serverOpt, p2p.WithDelimitedProtocol(pID2))
}

var clientOpt []p2p.SendRecvOption
if delimitedClient {
clientOpt = append(clientOpt, p2p.WithDelimitedProtocol(pID2))
}

client.Peerstore().AddAddrs(server.ID(), server.Addrs(), peerstore.PermanentAddrTTL)

// Register the server handler that either:
// - Errors is slot is negative
// - Echos the duty request if slot is even
// - Returns nothing is slot is odd
p2p.RegisterHandler("server", server, pID1,
p2p.RegisterHandler("server", server, pID,
func() proto.Message { return new(pbv1.Duty) },
func(ctx context.Context, peerID peer.ID, req proto.Message) (proto.Message, bool, error) {
log.Info(ctx, "See protocol logging field")
Expand All @@ -98,23 +51,18 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {
return nil, false, nil
}
},
serverOpt...,
)

sendReceive := func(slot int64) (*pbv1.Duty, error) {
resp := new(pbv1.Duty)
err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID1, clientOpt...)
err := p2p.SendReceive(ctx, client, server.ID(), &pbv1.Duty{Slot: slot}, resp, pID)

return resp, err
}

t.Run("server error", func(t *testing.T) {
_, err := sendReceive(-1)
if delimitedClient && delimitedServer {
require.ErrorContains(t, err, "read response: EOF")
} else {
require.ErrorContains(t, err, "no or zero response received")
}
require.ErrorContains(t, err, "read response: EOF")
})

t.Run("ok", func(t *testing.T) {
Expand All @@ -126,10 +74,6 @@ func testSendReceive(t *testing.T, delimitedClient, delimitedServer bool) {

t.Run("empty response", func(t *testing.T) {
_, err := sendReceive(101)
if delimitedClient && delimitedServer {
require.ErrorContains(t, err, "read response: EOF")
} else {
require.ErrorContains(t, err, "no or zero response received")
}
require.ErrorContains(t, err, "read response: EOF")
})
}
Loading