Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add p2p.Network component #2283

Merged
merged 39 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c073533
p2p network
joshua-kim Nov 7, 2023
ec6b9f9
add test
joshua-kim Nov 9, 2023
b84be08
nit
joshua-kim Nov 9, 2023
74765cb
nit
joshua-kim Nov 9, 2023
c54c7d5
go mod
joshua-kim Nov 9, 2023
f61113e
refactor validators
joshua-kim Nov 15, 2023
f377a8b
nit
joshua-kim Nov 15, 2023
2995647
nit
joshua-kim Nov 15, 2023
accd487
nit
joshua-kim Nov 15, 2023
be9e25a
nit
joshua-kim Nov 15, 2023
e1b726d
Merge branch 'dev' into network
joshua-kim Nov 17, 2023
f35fc75
nit
joshua-kim Nov 17, 2023
bc0c84a
Merge branch 'dev' into network
joshua-kim Nov 17, 2023
15a7af6
Merge branch 'dev' into network
joshua-kim Nov 21, 2023
2627182
Update network/p2p/validators.go
joshua-kim Nov 28, 2023
5b1d64a
Update network/p2p/network.go
joshua-kim Nov 28, 2023
f62f91a
Update network/p2p/network_test.go
joshua-kim Nov 28, 2023
4854748
Update network/p2p/network_test.go
joshua-kim Nov 28, 2023
d0440e8
Update network/p2p/validators_test.go
joshua-kim Nov 28, 2023
35ef6fe
unexport clientOptions
joshua-kim Nov 28, 2023
97bf26f
fix
joshua-kim Nov 28, 2023
e741d12
fix bug
joshua-kim Nov 28, 2023
1568dc7
nit
joshua-kim Nov 28, 2023
b6565dd
nit
joshua-kim Nov 28, 2023
438a0ab
nit
joshua-kim Nov 28, 2023
7c4a2af
Merge branch 'dev' into network
joshua-kim Nov 28, 2023
3cc6218
fix
joshua-kim Nov 28, 2023
af2740d
nit
joshua-kim Nov 28, 2023
e2781cf
nit
joshua-kim Nov 28, 2023
f512ead
nit
joshua-kim Nov 28, 2023
741f612
nti
joshua-kim Nov 28, 2023
bfa0638
Update network/p2p/validators.go
joshua-kim Nov 29, 2023
4701b31
nit
joshua-kim Nov 29, 2023
3a8cca5
Merge branch 'dev' into network
joshua-kim Nov 29, 2023
ee0eb08
nit
joshua-kim Nov 29, 2023
5fde830
tidy
joshua-kim Nov 29, 2023
6870966
Merge branch 'dev' into network
StephenButtolph Nov 29, 2023
7295103
Update coreth
StephenButtolph Nov 30, 2023
521f8c8
Merge branch 'dev' into network
StephenButtolph Nov 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.9-rc.0
github.com/ava-labs/coreth v0.12.8-rc.1.0.20231115212839-97af976b2d5a
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.9-rc.0 h1:Xvk/iJTY2MSBkkiOs9Eo92nxd67VXzRjaC/WmQXRIb0=
github.com/ava-labs/coreth v0.12.9-rc.0/go.mod h1:rECKQfGFDeodrwGPlJSvFUJDbVr30jSMIVjQLi6pNX4=
github.com/ava-labs/coreth v0.12.8-rc.1.0.20231115212839-97af976b2d5a h1:A4RdXCgsooZWqEDA430V7bLPTPdIoiGfd4djFUTW4Ek=
github.com/ava-labs/coreth v0.12.8-rc.1.0.20231115212839-97af976b2d5a/go.mod h1:nYEP2+B0GDW9b8H+cRNs1nGVgB5CdXjP+lJVpQMwi6o=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
7 changes: 3 additions & 4 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ type CrossChainAppResponseCallback func(
type Client struct {
handlerID uint64
handlerPrefix []byte
router *Router
router *router
sender common.AppSender
// nodeSampler is used to select nodes to route AppRequestAny to
nodeSampler NodeSampler
options *ClientOptions
}

// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
Expand All @@ -56,7 +55,7 @@ func (c *Client) AppRequestAny(
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
sampled := c.nodeSampler.Sample(ctx, 1)
sampled := c.options.NodeSampler.Sample(ctx, 1)
if len(sampled) != 1 {
return ErrNoPeers
}
Expand Down
42 changes: 19 additions & 23 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (

"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
Expand Down Expand Up @@ -117,10 +115,9 @@ func TestGossiperGossip(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

responseSender := common.NewMockSender(ctrl)
responseRouter := p2p.NewRouter(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseSender := &common.SenderTest{}
responseNetwork := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseBloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
responseSet := testSet{
Expand All @@ -130,31 +127,30 @@ func TestGossiperGossip(t *testing.T) {
for _, item := range tt.responder {
require.NoError(responseSet.Add(item))
}
peers := &p2p.Peers{}
require.NoError(peers.Connected(context.Background(), ids.EmptyNodeID, nil))

handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseRouter.RegisterAppProtocol(0x0, handler, peers)
_, err = responseNetwork.RegisterAppProtocol(0x0, handler)
require.NoError(err)

requestSender := common.NewMockSender(ctrl)
requestRouter := p2p.NewRouter(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")

gossiped := make(chan struct{})
requestSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) {
requestSender := &common.SenderTest{
SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error {
go func() {
require.NoError(responseRouter.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request))
require.NoError(responseNetwork.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request))
}()
}).AnyTimes()
return nil
},
}

responseSender.EXPECT().
SendAppResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) {
require.NoError(requestRouter.AppResponse(ctx, nodeID, requestID, appResponseBytes))
close(gossiped)
}).AnyTimes()
requestNetwork := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))

gossiped := make(chan struct{})
responseSender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
require.NoError(requestNetwork.AppResponse(ctx, nodeID, requestID, appResponseBytes))
close(gossiped)
return nil
}

bloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
Expand All @@ -166,7 +162,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(requestSet.Add(item))
}

requestClient, err := requestRouter.RegisterAppProtocol(0x0, nil, peers)
requestClient, err := requestNetwork.RegisterAppProtocol(0x0, nil)
require.NoError(err)

config := Config{
Expand Down
241 changes: 241 additions & 0 deletions network/p2p/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"encoding/binary"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
snowvalidators "github.com/ava-labs/avalanchego/snow/validators"
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)

var (
_ snowvalidators.Connector = (*Network)(nil)
_ common.AppHandler = (*Network)(nil)
_ NodeSampler = (*peers)(nil)
)

// ClientOption configures Client
type ClientOption interface {
apply(options *ClientOptions)
}

type clientOptionFunc func(options *ClientOptions)

func (o clientOptionFunc) apply(options *ClientOptions) {
o(options)
}

// WithPeerSampling configures Client.AppRequestAny to sample peers
func WithPeerSampling(network *Network) ClientOption {
return clientOptionFunc(func(options *ClientOptions) {
options.NodeSampler = network.peers
})
}

// WithValidatorSampling configures Client.AppRequestAny to sample validators
func WithValidatorSampling(validators *Validators) ClientOption {
return clientOptionFunc(func(options *ClientOptions) {
options.NodeSampler = validators
})
}

// ClientOptions holds client-configurable values
type ClientOptions struct {
// NodeSampler is used to select nodes to route Client.AppRequestAny to
NodeSampler NodeSampler
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

// NewNetwork returns an instance of Network
func NewNetwork(
log logging.Logger,
sender common.AppSender,
metrics prometheus.Registerer,
namespace string,
) *Network {
return &Network{
log: log,
sender: sender,
metrics: metrics,
namespace: namespace,
router: newRouter(log),
peers: &peers{},
}
}

// Network maintains state of the peer-to-peer network and any in-flight
// requests
type Network struct {
log logging.Logger
sender common.AppSender
metrics prometheus.Registerer
namespace string

*router
peers *peers
}

func (n *Network) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error {
n.lock.Lock()
defer n.lock.Unlock()

n.peers.set.Add(nodeID)
return nil
}

func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
n.lock.Lock()
defer n.lock.Unlock()

n.peers.set.Remove(nodeID)
return nil
}

// RegisterAppProtocol reserves an identifier for an application protocol and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (n *Network) RegisterAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) {
// TODO refactor router
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
n.router.lock.Lock()
defer n.router.lock.Unlock()

if _, ok := n.router.handlers[handlerID]; ok {
return nil, fmt.Errorf("failed to register handler id %d: %w", handlerID, ErrExistingAppProtocol)
}

appRequestTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_app_request", handlerID),
"app request time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app request metric for handler_%d: %w", handlerID, err)
}

appRequestFailedTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_app_request_failed", handlerID),
"app request failed time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app request failed metric for handler_%d: %w", handlerID, err)
}

appResponseTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_app_response", handlerID),
"app response time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app response metric for handler_%d: %w", handlerID, err)
}

appGossipTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_app_gossip", handlerID),
"app gossip time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register app gossip metric for handler_%d: %w", handlerID, err)
}

crossChainAppRequestTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_request", handlerID),
"cross chain app request time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app request metric for handler_%d: %w", handlerID, err)
}

crossChainAppRequestFailedTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_request_failed", handlerID),
"app request failed time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app request failed metric for handler_%d: %w", handlerID, err)
}

crossChainAppResponseTime, err := metric.NewAverager(
n.namespace,
fmt.Sprintf("handler_%d_cross_chain_app_response", handlerID),
"cross chain app response time (ns)",
n.metrics,
)
if err != nil {
return nil, fmt.Errorf("failed to register cross-chain app response metric for handler_%d: %w", handlerID, err)
}

n.router.handlers[handlerID] = &meteredHandler{
responder: &responder{
Handler: handler,
handlerID: handlerID,
log: n.log,
sender: n.sender,
},
metrics: &metrics{
appRequestTime: appRequestTime,
appRequestFailedTime: appRequestFailedTime,
appResponseTime: appResponseTime,
appGossipTime: appGossipTime,
crossChainAppRequestTime: crossChainAppRequestTime,
crossChainAppRequestFailedTime: crossChainAppRequestFailedTime,
crossChainAppResponseTime: crossChainAppResponseTime,
},
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

clientOptions := &ClientOptions{
NodeSampler: n.peers,
}

for _, option := range options {
option.apply(clientOptions)
}

return &Client{
handlerID: handlerID,
handlerPrefix: binary.AppendUvarint(nil, handlerID),
sender: n.sender,
router: n.router,
options: clientOptions,
}, nil
}

// peers contains the set of nodes we are connected to
type peers struct {
lock sync.RWMutex
set set.SampleableSet[ids.NodeID]
}

func (p *peers) has(nodeID ids.NodeID) bool {
p.lock.RLock()
defer p.lock.RUnlock()

return p.set.Contains(nodeID)
}

// Sample returns a pseudo-random sample of up to limit peers
func (p *peers) Sample(_ context.Context, limit int) []ids.NodeID {
p.lock.RLock()
defer p.lock.RUnlock()

return p.set.Sample(limit)
}
Loading
Loading