diff --git a/pkg/network/capability/capability.go b/pkg/network/capability/capability.go new file mode 100644 index 0000000000..153960d2cc --- /dev/null +++ b/pkg/network/capability/capability.go @@ -0,0 +1,71 @@ +package capability + +import ( + "errors" + + "github.com/nspcc-dev/neo-go/pkg/io" +) + +// MaxCapabilities is the maximum number of capabilities per payload +const MaxCapabilities = 32 + +// Capability describes network service available for node +type Capability struct { + Type Type + Data io.Serializable +} + +// DecodeBinary implements Serializable interface. +func (c *Capability) DecodeBinary(br *io.BinReader) { + c.Type = Type(br.ReadB()) + switch c.Type { + case FullNode: + c.Data = &Node{} + case TCPServer, WSServer: + c.Data = &Server{} + default: + br.Err = errors.New("unknown node capability type") + } + c.Data.DecodeBinary(br) +} + +// EncodeBinary implements Serializable interface. +func (c *Capability) EncodeBinary(bw *io.BinWriter) { + bw.WriteB(byte(c.Type)) + if c.Data == nil { + bw.Err = errors.New("capability has no data") + return + } + c.Data.EncodeBinary(bw) +} + +// Node represents full node capability with start height +type Node struct { + StartHeight uint32 +} + +// DecodeBinary implements Serializable interface. +func (n *Node) DecodeBinary(br *io.BinReader) { + n.StartHeight = br.ReadU32LE() +} + +// EncodeBinary implements Serializable interface. +func (n *Node) EncodeBinary(bw *io.BinWriter) { + bw.WriteU32LE(n.StartHeight) +} + +// Server represents TCP or WS server capability with port +type Server struct { + // Port is the port this server is listening on + Port uint16 +} + +// DecodeBinary implements Serializable interface. +func (s *Server) DecodeBinary(br *io.BinReader) { + s.Port = br.ReadU16LE() +} + +// EncodeBinary implements Serializable interface. +func (s *Server) EncodeBinary(bw *io.BinWriter) { + bw.WriteU16LE(s.Port) +} diff --git a/pkg/network/capability/type.go b/pkg/network/capability/type.go new file mode 100644 index 0000000000..b25b153979 --- /dev/null +++ b/pkg/network/capability/type.go @@ -0,0 +1,13 @@ +package capability + +// Type represents node capability type +type Type byte + +const ( + // TCPServer represents TCP node capability type + TCPServer Type = 0x01 + // WSServer represents WebSocket node capability type + WSServer Type = 0x02 + // FullNode represents full node capability type + FullNode Type = 0x10 +) diff --git a/pkg/network/discovery_test.go b/pkg/network/discovery_test.go index 9105cdc0b3..81469def24 100644 --- a/pkg/network/discovery_test.go +++ b/pkg/network/discovery_test.go @@ -29,6 +29,9 @@ func (ft *fakeTransp) Accept() { func (ft *fakeTransp) Proto() string { return "" } +func (ft *fakeTransp) Address() string { + return "" +} func (ft *fakeTransp) Close() { } func TestDefaultDiscoverer(t *testing.T) { diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 411afc88d6..bdec398f96 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -1,11 +1,11 @@ package network import ( + "fmt" "math/rand" "net" "sync/atomic" "testing" - "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" @@ -163,15 +163,6 @@ func (d testDiscovery) RequestRemote(n int) {} func (d testDiscovery) BadPeers() []string { return []string{} } func (d testDiscovery) GoodPeers() []string { return []string{} } -type localTransport struct{} - -func (t localTransport) Dial(addr string, timeout time.Duration) error { - return nil -} -func (t localTransport) Accept() {} -func (t localTransport) Proto() string { return "local" } -func (t localTransport) Close() {} - var defaultMessageHandler = func(t *testing.T, msg *Message) {} type localPeer struct { @@ -267,11 +258,10 @@ func (p *localPeer) Handshaked() bool { return p.handshaked } -func newTestServer(t *testing.T) *Server { - return &Server{ - ServerConfig: ServerConfig{}, +func newTestServer(t *testing.T, serverConfig ServerConfig) *Server { + s := &Server{ + ServerConfig: serverConfig, chain: &testChain{}, - transport: localTransport{}, discovery: testDiscovery{}, id: rand.Uint32(), quit: make(chan struct{}), @@ -280,5 +270,6 @@ func newTestServer(t *testing.T) *Server { peers: make(map[Peer]bool), log: zaptest.NewLogger(t), } - + s.transport = NewTCPTransport(s, fmt.Sprintf("%s:%d", s.ServerConfig.Address, s.ServerConfig.Port), s.log) + return s } diff --git a/pkg/network/payload/version.go b/pkg/network/payload/version.go index 86be2b80c0..ef563c24aa 100644 --- a/pkg/network/payload/version.go +++ b/pkg/network/payload/version.go @@ -5,19 +5,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/io" -) - -// Size of the payload not counting UserAgent encoding (which is at least 1 byte -// for zero-length string). -const minVersionSize = 27 - -// List of Services offered by the node. -const ( - nodePeerService uint64 = 1 - // BloomFilerService uint64 = 2 // Not implemented - // PrunedNode uint64 = 3 // Not implemented - // LightNode uint64 = 4 // Not implemented - + "github.com/nspcc-dev/neo-go/pkg/network/capability" ) // Version payload. @@ -26,34 +14,25 @@ type Version struct { Magic config.NetMode // currently the version of the protocol is 0 Version uint32 - // currently 1 - Services uint64 // timestamp Timestamp uint32 - // port this server is listening on - Port uint16 // it's used to distinguish the node from public IP Nonce uint32 // client id UserAgent []byte - // Height of the block chain - StartHeight uint32 - // Whether to receive and forward - Relay bool + // List of available network services + Capabilities []capability.Capability } // NewVersion returns a pointer to a Version payload. -func NewVersion(magic config.NetMode, id uint32, p uint16, ua string, h uint32, r bool) *Version { +func NewVersion(magic config.NetMode, id uint32, ua string, c []capability.Capability) *Version { return &Version{ - Magic: magic, - Version: 0, - Services: nodePeerService, - Timestamp: uint32(time.Now().UTC().Unix()), - Port: p, - Nonce: id, - UserAgent: []byte(ua), - StartHeight: h, - Relay: r, + Magic: magic, + Version: 0, + Timestamp: uint32(time.Now().UTC().Unix()), + Nonce: id, + UserAgent: []byte(ua), + Capabilities: c, } } @@ -61,25 +40,27 @@ func NewVersion(magic config.NetMode, id uint32, p uint16, ua string, h uint32, func (p *Version) DecodeBinary(br *io.BinReader) { p.Magic = config.NetMode(br.ReadU32LE()) p.Version = br.ReadU32LE() - p.Services = br.ReadU64LE() p.Timestamp = br.ReadU32LE() - p.Port = br.ReadU16LE() p.Nonce = br.ReadU32LE() p.UserAgent = br.ReadVarBytes() - p.StartHeight = br.ReadU32LE() - p.Relay = br.ReadBool() + br.ReadArray(&p.Capabilities, capability.MaxCapabilities) + unique := make(map[capability.Type]bool) + for _, cap := range p.Capabilities { + if unique[cap.Type] == false { + unique[cap.Type] = true + } else { + panic("capabilities with the same type are not allowed") + } + } } // EncodeBinary implements Serializable interface. func (p *Version) EncodeBinary(br *io.BinWriter) { br.WriteU32LE(uint32(p.Magic)) br.WriteU32LE(p.Version) - br.WriteU64LE(p.Services) br.WriteU32LE(p.Timestamp) - br.WriteU16LE(p.Port) br.WriteU32LE(p.Nonce) br.WriteVarBytes(p.UserAgent) - br.WriteU32LE(p.StartHeight) - br.WriteBool(p.Relay) + br.WriteArray(p.Capabilities) } diff --git a/pkg/network/payload/version_test.go b/pkg/network/payload/version_test.go index c8c8ccad6e..09cd9430b1 100644 --- a/pkg/network/payload/version_test.go +++ b/pkg/network/payload/version_test.go @@ -5,25 +5,44 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/internal/testserdes" + "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/stretchr/testify/assert" ) func TestVersionEncodeDecode(t *testing.T) { var magic config.NetMode = 56753 - var port uint16 = 3000 + var tcpPort uint16 = 3000 + var wsPort uint16 = 3001 var id uint32 = 13337 useragent := "/NEO:0.0.1/" var height uint32 = 100500 - var relay = true + var capabilities = []capability.Capability{ + { + Type: capability.TCPServer, + Data: &capability.Server{ + Port: tcpPort, + }, + }, + { + Type: capability.WSServer, + Data: &capability.Server{ + Port: wsPort, + }, + }, + { + Type: capability.FullNode, + Data: &capability.Node{ + StartHeight: height, + }, + }, + } - version := NewVersion(magic, id, port, useragent, height, relay) + version := NewVersion(magic, id, useragent, capabilities) versionDecoded := &Version{} testserdes.EncodeDecodeBinary(t, version, versionDecoded) assert.Equal(t, versionDecoded.Nonce, id) - assert.Equal(t, versionDecoded.Port, port) + assert.ElementsMatch(t, capabilities, versionDecoded.Capabilities) assert.Equal(t, versionDecoded.UserAgent, []byte(useragent)) - assert.Equal(t, versionDecoded.StartHeight, height) - assert.Equal(t, versionDecoded.Relay, relay) assert.Equal(t, version, versionDecoded) } diff --git a/pkg/network/server.go b/pkg/network/server.go index ff99d3d9a1..e54ea570b5 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" @@ -347,13 +348,33 @@ func (s *Server) HandshakedPeersCount() int { // getVersionMsg returns current version message. func (s *Server) getVersionMsg() *Message { + _, portStr, err := net.SplitHostPort(s.transport.Address()) + if err != nil { + panic(err) + } + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + panic(err) + } + capabilities := []capability.Capability{ + { + Type: capability.FullNode, + Data: &capability.Node{ + StartHeight: s.chain.BlockHeight(), + }, + }, + { + Type: capability.TCPServer, + Data: &capability.Server{ + Port: uint16(port), + }, + }, + } payload := payload.NewVersion( s.Net, s.id, - s.Port, s.UserAgent, - s.chain.BlockHeight(), - s.Relay, + capabilities, ) return NewMessage(CMDVersion, payload) } @@ -836,7 +857,14 @@ func (s *Server) broadcastTxHashes(hs []util.Uint256) { // We need to filter out non-relaying nodes, so plain broadcast // functions don't fit here. s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { - return p.Handshaked() && p.Version().Relay + var isFullNode bool + for _, c := range p.Version().Capabilities { + if c.Type == capability.FullNode { + isFullNode = true + break + } + } + return p.Handshaked() && isFullNode }) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 19ed92b1d4..26f02736c7 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -4,6 +4,7 @@ import ( "net" "testing" + "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -11,22 +12,29 @@ import ( func TestSendVersion(t *testing.T) { var ( - s = newTestServer(t) + s = newTestServer(t, ServerConfig{Port: 3000, UserAgent: "/test/"}) p = newLocalPeer(t, s) ) - s.Port = 3000 - s.UserAgent = "/test/" - p.messageHandler = func(t *testing.T, msg *Message) { assert.Equal(t, CMDVersion, msg.Command) assert.IsType(t, msg.Payload, &payload.Version{}) version := msg.Payload.(*payload.Version) assert.NotZero(t, version.Nonce) - assert.Equal(t, uint16(3000), version.Port) - assert.Equal(t, uint64(1), version.Services) + assert.Equal(t, 2, len(version.Capabilities)) + assert.ElementsMatch(t, []capability.Capability{ + { + Type: capability.TCPServer, + Data: &capability.Server{ + Port: 3000, + }, + }, + { + Type: capability.FullNode, + Data: &capability.Node{StartHeight: 0}, + }, + }, version.Capabilities) assert.Equal(t, uint32(0), version.Version) assert.Equal(t, []byte("/test/"), version.UserAgent) - assert.Equal(t, uint32(0), version.StartHeight) } require.NoError(t, p.SendVersion()) @@ -35,7 +43,7 @@ func TestSendVersion(t *testing.T) { // Server should reply with a verack after receiving a valid version. func TestVerackAfterHandleVersionCmd(t *testing.T) { var ( - s = newTestServer(t) + s = newTestServer(t, ServerConfig{}) p = newLocalPeer(t, s) ) na, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:3000") @@ -45,7 +53,21 @@ func TestVerackAfterHandleVersionCmd(t *testing.T) { p.messageHandler = func(t *testing.T, msg *Message) { assert.Equal(t, CMDVerack, msg.Command) } - version := payload.NewVersion(0, 1337, 3000, "/NEO-GO/", 0, true) + capabilities := []capability.Capability{ + { + Type: capability.FullNode, + Data: &capability.Node{ + StartHeight: 0, + }, + }, + { + Type: capability.TCPServer, + Data: &capability.Server{ + Port: 3000, + }, + }, + } + version := payload.NewVersion(0, 1337, "/NEO-GO/", capabilities) require.NoError(t, s.handleVersionCmd(p, version)) } @@ -54,12 +76,11 @@ func TestVerackAfterHandleVersionCmd(t *testing.T) { // invalid version and disconnects the peer. func TestServerNotSendsVerack(t *testing.T) { var ( - s = newTestServer(t) + s = newTestServer(t, ServerConfig{Net: 56753}) p = newLocalPeer(t, s) p2 = newLocalPeer(t, s) ) s.id = 1 - s.Net = 56753 finished := make(chan struct{}) go func() { s.run() @@ -76,8 +97,22 @@ func TestServerNotSendsVerack(t *testing.T) { p2.netaddr = *na s.register <- p + capabilities := []capability.Capability{ + { + Type: capability.FullNode, + Data: &capability.Node{ + StartHeight: 0, + }, + }, + { + Type: capability.TCPServer, + Data: &capability.Server{ + Port: 3000, + }, + }, + } // identical id's - version := payload.NewVersion(56753, 1, 3000, "/NEO-GO/", 0, true) + version := payload.NewVersion(56753, 1, "/NEO-GO/", capabilities) err := s.handleVersionCmd(p, version) assert.NotNil(t, err) assert.Equal(t, errIdenticalID, err) @@ -104,7 +139,7 @@ func TestServerNotSendsVerack(t *testing.T) { func TestRequestHeaders(t *testing.T) { var ( - s = newTestServer(t) + s = newTestServer(t, ServerConfig{}) p = newLocalPeer(t, s) ) p.messageHandler = func(t *testing.T, msg *Message) { diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index c817eb3381..ba63f74729 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" "go.uber.org/zap" ) @@ -226,10 +227,16 @@ func (p *TCPPeer) handleQueues() { func (p *TCPPeer) StartProtocol() { var err error + var startHeight uint32 + for _, cap := range p.Version().Capabilities { + if cap.Type == capability.FullNode { + startHeight = cap.Data.(*capability.Node).StartHeight + } + } p.server.log.Info("started protocol", zap.Stringer("addr", p.RemoteAddr()), zap.ByteString("userAgent", p.Version().UserAgent), - zap.Uint32("startHeight", p.Version().StartHeight), + zap.Uint32("startHeight", startHeight), zap.Uint32("id", p.Version().Nonce)) p.server.discovery.RegisterGoodAddr(p.PeerAddr().String()) @@ -293,7 +300,13 @@ func (p *TCPPeer) HandleVersion(version *payload.Version) error { return errors.New("invalid handshake: already received Version") } p.version = version - p.lastBlockIndex = version.StartHeight + for _, cap := range version.Capabilities { + if cap.Type == capability.FullNode { + p.lastBlockIndex = cap.Data.(*capability.Node).StartHeight + break + } + } + p.handShake |= versionReceived return nil } @@ -352,7 +365,16 @@ func (p *TCPPeer) PeerAddr() net.Addr { if err != nil { return p.RemoteAddr() } - addrString := net.JoinHostPort(host, strconv.Itoa(int(p.version.Port))) + var port uint16 + for _, cap := range p.version.Capabilities { + if cap.Type == capability.TCPServer { + port = cap.Data.(*capability.Server).Port + } + } + if port == 0 { + return p.RemoteAddr() + } + addrString := net.JoinHostPort(host, strconv.Itoa(int(port))) tcpAddr, err := net.ResolveTCPAddr("tcp", addrString) if err != nil { return p.RemoteAddr() diff --git a/pkg/network/tcp_peer_test.go b/pkg/network/tcp_peer_test.go index 210a2a3317..b1cdfa9858 100644 --- a/pkg/network/tcp_peer_test.go +++ b/pkg/network/tcp_peer_test.go @@ -18,8 +18,8 @@ func connReadStub(conn net.Conn) { func TestPeerHandshake(t *testing.T) { server, client := net.Pipe() - tcpS := NewTCPPeer(server, newTestServer(t)) - tcpC := NewTCPPeer(client, newTestServer(t)) + tcpS := NewTCPPeer(server, newTestServer(t, ServerConfig{})) + tcpC := NewTCPPeer(client, newTestServer(t, ServerConfig{})) // Something should read things written into the pipe. go connReadStub(tcpS.conn) diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index 8195ca0391..b69836951f 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -41,16 +41,10 @@ func (t *TCPTransport) Dial(addr string, timeout time.Duration) error { // Accept implements the Transporter interface. func (t *TCPTransport) Accept() { - l, err := net.Listen("tcp", t.bindAddr) - if err != nil { - t.log.Panic("TCP listen error", zap.Error(err)) - return - } - - t.listener = l + t.addListener() for { - conn, err := l.Accept() + conn, err := t.listener.Accept() if err != nil { t.log.Warn("TCP accept error", zap.Error(err)) if t.isCloseError(err) { @@ -84,3 +78,20 @@ func (t *TCPTransport) Close() { func (t *TCPTransport) Proto() string { return "tcp" } + +// Address implements the Transporter interface. +func (t *TCPTransport) Address() string { + if t.listener == nil { + t.addListener() + } + return t.listener.Addr().String() +} + +func (t *TCPTransport) addListener() { + l, err := net.Listen("tcp", t.bindAddr) + if err != nil { + t.log.Panic("TCP listen error", zap.Error(err)) + } + + t.listener = l +} diff --git a/pkg/network/transport.go b/pkg/network/transport.go index 684f86717f..0f4d9e821d 100644 --- a/pkg/network/transport.go +++ b/pkg/network/transport.go @@ -8,5 +8,6 @@ type Transporter interface { Dial(addr string, timeout time.Duration) error Accept() Proto() string + Address() string Close() } diff --git a/pkg/rpc/client/rpc_test.go b/pkg/rpc/client/rpc_test.go index 3bcf13f7a4..f4f60a5e73 100644 --- a/pkg/rpc/client/rpc_test.go +++ b/pkg/rpc/client/rpc_test.go @@ -894,7 +894,7 @@ var rpcClientTestCases = map[string][]rpcClientTestCase{ invoke: func(c *Client) (interface{}, error) { return c.GetVersion() }, - serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"port":20332,"nonce":2153672787,"useragent":"/NEO-GO:0.73.1-pre-273-ge381358/"}}`, + serverResponse: `{"id":1,"jsonrpc":"2.0","result":{"tcp_port":20332,"nonce":2153672787,"useragent":"/NEO-GO:0.73.1-pre-273-ge381358/"}}`, result: func(c *Client) interface{} { return &result.Version{ Port: uint16(20332), diff --git a/pkg/rpc/response/result/version.go b/pkg/rpc/response/result/version.go index 145d197b8a..5c80fed5e3 100644 --- a/pkg/rpc/response/result/version.go +++ b/pkg/rpc/response/result/version.go @@ -4,7 +4,7 @@ type ( // Version model used for reporting server version // info. Version struct { - Port uint16 `json:"port"` + Port uint16 `json:"tcp_port"` Nonce uint32 `json:"nonce"` UserAgent string `json:"useragent"` }