Skip to content

Commit

Permalink
protocol: add capabilities to address payload
Browse files Browse the repository at this point in the history
Part of #871
  • Loading branch information
AnnaShaleva committed May 27, 2020
1 parent 9841df7 commit bb0d9a9
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 47 deletions.
31 changes: 21 additions & 10 deletions pkg/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package network
import (
"sync"
"time"

"github.com/nspcc-dev/neo-go/pkg/network/capability"
)

const (
Expand All @@ -18,12 +20,18 @@ type Discoverer interface {
PoolCount() int
RequestRemote(int)
RegisterBadAddr(string)
RegisterGoodAddr(string)
RegisterGoodAddr(string, capability.Capabilities)
RegisterConnectedAddr(string)
UnregisterConnectedAddr(string)
UnconnectedPeers() []string
BadPeers() []string
GoodPeers() []string
GoodPeers() []AddressWithCapabilities
}

// AddressWithCapabilities represents node address with its capabilities
type AddressWithCapabilities struct {
Address string
Capabilities capability.Capabilities
}

// DefaultDiscovery default implementation of the Discoverer interface.
Expand All @@ -34,7 +42,7 @@ type DefaultDiscovery struct {
dialTimeout time.Duration
badAddrs map[string]bool
connectedAddrs map[string]bool
goodAddrs map[string]bool
goodAddrs map[string]capability.Capabilities
unconnectedAddrs map[string]int
isDead bool
requestCh chan int
Expand All @@ -48,7 +56,7 @@ func NewDefaultDiscovery(dt time.Duration, ts Transporter) *DefaultDiscovery {
dialTimeout: dt,
badAddrs: make(map[string]bool),
connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]bool),
goodAddrs: make(map[string]capability.Capabilities),
unconnectedAddrs: make(map[string]int),
requestCh: make(chan int),
pool: make(chan string, maxPoolSize),
Expand Down Expand Up @@ -135,21 +143,24 @@ func (d *DefaultDiscovery) BadPeers() []string {

// GoodPeers returns all addresses of known good peers (that at least once
// succeeded handshaking with us).
func (d *DefaultDiscovery) GoodPeers() []string {
func (d *DefaultDiscovery) GoodPeers() []AddressWithCapabilities {
d.lock.RLock()
addrs := make([]string, 0, len(d.goodAddrs))
for addr := range d.goodAddrs {
addrs = append(addrs, addr)
addrs := make([]AddressWithCapabilities, 0, len(d.goodAddrs))
for addr, cap := range d.goodAddrs {
addrs = append(addrs, AddressWithCapabilities{
Address: addr,
Capabilities: cap,
})
}
d.lock.RUnlock()
return addrs
}

// RegisterGoodAddr registers good known connected address that passed
// handshake successfully.
func (d *DefaultDiscovery) RegisterGoodAddr(s string) {
func (d *DefaultDiscovery) RegisterGoodAddr(s string, c capability.Capabilities) {
d.lock.Lock()
d.goodAddrs[s] = true
d.goodAddrs[s] = c
d.lock.Unlock()
}

Expand Down
20 changes: 18 additions & 2 deletions pkg/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -79,9 +80,24 @@ func TestDefaultDiscoverer(t *testing.T) {

// Registered good addresses should end up in appropriate set.
for _, addr := range set1 {
d.RegisterGoodAddr(addr)
d.RegisterGoodAddr(addr, capability.Capabilities{
{
Type: capability.FullNode,
Data: &capability.Node{StartHeight: 123},
},
})
}
gAddrWithCap := d.GoodPeers()
gAddrs := make([]string, len(gAddrWithCap))
for i, addr := range gAddrWithCap {
require.Equal(t, capability.Capabilities{
{
Type: capability.FullNode,
Data: &capability.Node{StartHeight: 123},
},
}, addr.Capabilities)
gAddrs[i] = addr.Address
}
gAddrs := d.GoodPeers()
sort.Strings(gAddrs)
assert.Equal(t, 0, d.PoolCount())
assert.Equal(t, 0, len(d.UnconnectedPeers()))
Expand Down
24 changes: 13 additions & 11 deletions pkg/network/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"testing"

"github.com/nspcc-dev/neo-go/pkg/network/capability"

"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
Expand Down Expand Up @@ -151,17 +153,17 @@ func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {

type testDiscovery struct{}

func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) Close() {}
func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) RegisterGoodAddr(string) {}
func (d testDiscovery) RegisterConnectedAddr(string) {}
func (d testDiscovery) UnregisterConnectedAddr(string) {}
func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
func (d testDiscovery) RequestRemote(n int) {}
func (d testDiscovery) BadPeers() []string { return []string{} }
func (d testDiscovery) GoodPeers() []string { return []string{} }
func (d testDiscovery) BackFill(addrs ...string) {}
func (d testDiscovery) Close() {}
func (d testDiscovery) PoolCount() int { return 0 }
func (d testDiscovery) RegisterBadAddr(string) {}
func (d testDiscovery) RegisterGoodAddr(string, capability.Capabilities) {}
func (d testDiscovery) RegisterConnectedAddr(string) {}
func (d testDiscovery) UnregisterConnectedAddr(string) {}
func (d testDiscovery) UnconnectedPeers() []string { return []string{} }
func (d testDiscovery) RequestRemote(n int) {}
func (d testDiscovery) BadPeers() []string { return []string{} }
func (d testDiscovery) GoodPeers() []AddressWithCapabilities { return []AddressWithCapabilities{} }

var defaultMessageHandler = func(t *testing.T, msg *Message) {}

Expand Down
40 changes: 24 additions & 16 deletions pkg/network/payload/address.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package payload

import (
"errors"
"net"
"strconv"
"time"

"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
)

// AddressAndTime payload.
type AddressAndTime struct {
Timestamp uint32
Services uint64
IP [16]byte
Port uint16
Timestamp uint32
IP [16]byte
Capabilities capability.Capabilities
}

// NewAddressAndTime creates a new AddressAndTime object.
func NewAddressAndTime(e *net.TCPAddr, t time.Time) *AddressAndTime {
func NewAddressAndTime(e *net.TCPAddr, t time.Time, c capability.Capabilities) *AddressAndTime {
aat := AddressAndTime{
Timestamp: uint32(t.UTC().Unix()),
Services: 1,
Port: uint16(e.Port),
Timestamp: uint32(t.UTC().Unix()),
Capabilities: c,
}
copy(aat.IP[:], e.IP)
return &aat
Expand All @@ -30,26 +30,34 @@ func NewAddressAndTime(e *net.TCPAddr, t time.Time) *AddressAndTime {
// DecodeBinary implements Serializable interface.
func (p *AddressAndTime) DecodeBinary(br *io.BinReader) {
p.Timestamp = br.ReadU32LE()
p.Services = br.ReadU64LE()
br.ReadBytes(p.IP[:])
p.Port = br.ReadU16BE()
p.Capabilities.DecodeBinary(br)
}

// EncodeBinary implements Serializable interface.
func (p *AddressAndTime) EncodeBinary(bw *io.BinWriter) {
bw.WriteU32LE(p.Timestamp)
bw.WriteU64LE(p.Services)
bw.WriteBytes(p.IP[:])
bw.WriteU16BE(p.Port)
p.Capabilities.EncodeBinary(bw)
}

// IPPortString makes a string from IP and port specified.
func (p *AddressAndTime) IPPortString() string {
// GetTCPAddress makes a string from IP and port specified in TCPCapability.
// It returns an error if there's no such capability.
func (p *AddressAndTime) GetTCPAddress() (string, error) {
var netip = make(net.IP, 16)

copy(netip, p.IP[:])
port := strconv.Itoa(int(p.Port))
return netip.String() + ":" + port
port := -1
for _, cap := range p.Capabilities {
if cap.Type == capability.TCPServer {
port = int(cap.Data.(*capability.Server).Port)
break
}
}
if port == -1 {
return "", errors.New("no TCP capability found")
}
return net.JoinHostPort(netip.String(), strconv.Itoa(port)), nil
}

// AddressList is a list with AddrAndTime.
Expand Down
21 changes: 18 additions & 3 deletions pkg/network/payload/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,31 @@ import (
"time"

"github.com/nspcc-dev/neo-go/pkg/internal/testserdes"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/stretchr/testify/assert"
)

func TestEncodeDecodeAddress(t *testing.T) {
var (
e, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:2000")
ts = time.Now()
addr = NewAddressAndTime(e, ts)
addr = NewAddressAndTime(e, ts, capability.Capabilities{
{
Type: capability.TCPServer,
Data: &capability.Server{Port: uint16(e.Port)},
},
})
)

assert.Equal(t, ts.UTC().Unix(), int64(addr.Timestamp))
aatip := make(net.IP, 16)
copy(aatip, addr.IP[:])
assert.Equal(t, e.IP, aatip)
assert.Equal(t, e.Port, int(addr.Port))
assert.Equal(t, 1, len(addr.Capabilities))
assert.Equal(t, capability.Capability{
Type: capability.TCPServer,
Data: &capability.Server{Port: uint16(e.Port)},
}, addr.Capabilities[0])

testserdes.EncodeDecodeBinary(t, addr, new(AddressAndTime))
}
Expand All @@ -31,7 +41,12 @@ func TestEncodeDecodeAddressList(t *testing.T) {
addrList := NewAddressList(int(lenList))
for i := 0; i < int(lenList); i++ {
e, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:200%d", i))
addrList.Addrs[i] = NewAddressAndTime(e, time.Now())
addrList.Addrs[i] = NewAddressAndTime(e, time.Now(), capability.Capabilities{
{
Type: capability.TCPServer,
Data: &capability.Server{Port: 123},
},
})
}

testserdes.EncodeDecodeBinary(t, addrList, new(AddressList))
Expand Down
9 changes: 6 additions & 3 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,10 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
// handleAddrCmd will process received addresses.
func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error {
for _, a := range addrs.Addrs {
s.discovery.BackFill(a.IPPortString())
addr, err := a.GetTCPAddress()
if err != nil {
s.discovery.BackFill(addr)
}
}
return nil
}
Expand All @@ -657,8 +660,8 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
ts := time.Now()
for i, addr := range addrs {
// we know it's a good address, so it can't fail
netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
netaddr, _ := net.ResolveTCPAddr("tcp", addr.Address)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts, addr.Capabilities)
}
return p.EnqueueP2PMessage(NewMessage(CMDAddr, alist))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSendVersion(t *testing.T) {
)
// we need to set listener at least to handle dynamic port correctly
go s.transport.Accept()
require.Eventually(t, func() bool { return s.transport.Address() != "" }, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return s.transport.Address() != "" }, time.Second, 100*time.Millisecond)
p.messageHandler = func(t *testing.T, msg *Message) {
// listener is already set, so Address() gives us proper address with port
_, p, err := net.SplitHostPort(s.transport.Address())
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/tcp_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (p *TCPPeer) StartProtocol() {
zap.Uint32("startHeight", p.lastBlockIndex),
zap.Uint32("id", p.Version().Nonce))

p.server.discovery.RegisterGoodAddr(p.PeerAddr().String())
p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities)
if p.server.chain.HeaderHeight() < p.LastBlockIndex() {
err = p.server.requestHeaders(p)
if err != nil {
Expand Down

0 comments on commit bb0d9a9

Please sign in to comment.