Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/public visors advertising #743

Merged
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d6f9fa9
Advertise public visors.
taras-skycoin Nov 18, 2020
0f2dfe8
Fix linter
taras-skycoin Nov 18, 2020
c1f2a73
Add explicit return.
taras-skycoin Nov 18, 2020
c3ecaf2
Remove public_address field;Fix linter.
taras-skycoin Nov 19, 2020
9b5cb14
Add ReadOnly address resolver client.
taras-skycoin Nov 19, 2020
1ca9025
Add boolean field wich indicates if visor is on public address.
taras-skycoin Nov 19, 2020
741a4b4
Pass STCPR port to service-discover port.
taras-skycoin Nov 19, 2020
2a1cc24
Merge remote-tracking branch 'upstream/develop' into advertise_public…
taras-skycoin Nov 19, 2020
17d8596
Fix sleep delays.
taras-skycoin Nov 19, 2020
d840013
Merge branch 'develop' of github.com:skycoin/skywire into advertise_p…
Darkren Feb 1, 2021
4426259
Start merging old trusted visors with the new public visor concept
Darkren Feb 2, 2021
9eba538
Add func to fetch default network interface name for OSX
Darkren Feb 2, 2021
ce12e5b
Include local visor IPs in the update request for visor service
Darkren Feb 2, 2021
00fc277
Clean up the code
Darkren Feb 2, 2021
6568a24
Fix import cycle, fix visor test
Darkren Feb 2, 2021
0ed9947
Merge branch 'develop' of github.com:skycoin/skywire into feature/pub…
Darkren Mar 16, 2021
c9ab872
Merge branch 'develop' of github.com:skycoin/skywire into feature/pub…
Darkren Mar 28, 2021
9d74cd9
Add debug logs
Darkren Mar 28, 2021
6ffe9d3
Update deps
Darkren Mar 28, 2021
fb05117
Add debug logs
Darkren Mar 28, 2021
a90a855
Make service updater stop retrying if visor is unreachable
Darkren Mar 30, 2021
4cc790f
Properly stop ticker on updater exit
Darkren Mar 30, 2021
083cf99
Remove debug logs
Darkren Mar 30, 2021
c25e632
Merge branch 'develop' into feature/public-visors-advertising
i-hate-nicknames Apr 27, 2021
0f249ce
Run make format
i-hate-nicknames Apr 27, 2021
dbc25f6
Use errors.Is instead of string matching
i-hate-nicknames Apr 27, 2021
6af273c
Add exponential backoff to service update entry
i-hate-nicknames Apr 27, 2021
d7b23ae
Refactor entry update loop
i-hate-nicknames Apr 27, 2021
0ebc644
Move POST service discovery to a separate func
i-hate-nicknames Apr 27, 2021
10b8760
Add DefaultNetworkInterfaceIPs netutil function
i-hate-nicknames Apr 27, 2021
a78aea4
Update config for pulic visors
i-hate-nicknames Apr 28, 2021
0055a0c
Add IsPublicIp function to netutil
i-hate-nicknames Apr 28, 2021
25b7ca0
Check is_public field for self advertising
i-hate-nicknames Apr 28, 2021
380ad39
Move public visor connection to autoconnector
i-hate-nicknames Apr 28, 2021
cc98efa
Keep track of the connected services
i-hate-nicknames Apr 28, 2021
a24fd9c
Add exponential backoff for fetching services
i-hate-nicknames Apr 28, 2021
31498e0
Fix lint errors
i-hate-nicknames Apr 29, 2021
443e8f5
Check if transport is up
i-hate-nicknames May 5, 2021
263a27a
Add codereview fixes: todos, error handling
i-hate-nicknames May 7, 2021
a90627e
Refactor URL construction
i-hate-nicknames May 11, 2021
3974489
Add app disc refactoring todo
i-hate-nicknames May 11, 2021
b293588
Take visor self-advertising out of snet package
i-hate-nicknames May 11, 2021
5616644
Use transport manager in autoconnector
i-hate-nicknames May 11, 2021
207d063
Merge branch 'develop' into feature/public-visors-advertising
i-hate-nicknames May 12, 2021
f86bfb0
Add public visor advertisement as init module
i-hate-nicknames May 12, 2021
0a54576
Use transport labels in connecting to public visors
i-hate-nicknames May 13, 2021
e3fd124
Merge branch 'develop' into feature/public-visors-advertising
i-hate-nicknames May 13, 2021
c1017cc
Use canonical names for variables
i-hate-nicknames May 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added Char
Empty file.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9 h1:sYNJzB4J8toYPQTM6pAkcm
golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0 h1:KU7oHjnv3XNWfa5COkzUifxZmxp1TyI7ImMXqFxLwvQ=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
Expand Down
14 changes: 9 additions & 5 deletions internal/vpn/os_client_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func DefaultNetworkGateway() (net.IP, error) {

var setupClientOnce sync.Once

func setupClientSysPrivileges() (suid int, err error) {
func setupClientSysPrivileges() (int, error) {
var err error
setupClientOnce.Do(func() {
var caps capability.Capabilities

Expand All @@ -64,15 +65,18 @@ func setupClientSysPrivileges() (suid int, err error) {

// set `CAP_NET_ADMIN` capability to needed caps sets.
caps.Set(capability.CAPS|capability.BOUNDS|capability.AMBIENT, capability.CAP_NET_ADMIN)
if e := caps.Apply(capability.CAPS | capability.BOUNDS | capability.AMBIENT); e != nil {
err = fmt.Errorf("failed to apply capabilties: %w", e)
err = caps.Apply(capability.CAPS | capability.BOUNDS | capability.AMBIENT)
if err != nil {
err = fmt.Errorf("failed to apply capabilties: %w", err)

return
}

// let child process keep caps sets from the parent, so we may do calls to
// system utilities with these caps.
if e := unix.Prctl(unix.PR_SET_KEEPCAPS, 1, 0, 0, 0); e != nil {
err = fmt.Errorf("failed to set PR_SET_KEEPCAPS: %w", e)
err = unix.Prctl(unix.PR_SET_KEEPCAPS, 1, 0, 0, 0)
if err != nil {
err = fmt.Errorf("failed to set PR_SET_KEEPCAPS: %w", err)
return
}
})
Expand Down
99 changes: 99 additions & 0 deletions pkg/servicedisc/autoconnect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package servicedisc

import (
"context"
"time"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/internal/netutil"
)

const (
// PublicServiceDelay defines a delay before adding transports to public services.
PublicServiceDelay = 5 * time.Second

fetchServicesDelay = 2 * time.Second
)

// ConnectFn provides a way to connect to remote service
type ConnectFn func(context.Context, cipher.PubKey) error

// CheckConnFN checks that connection is alive
type CheckConnFN func(cipher.PubKey) bool

// Autoconnector continuously tries to connect to services
type Autoconnector interface {
Run(context.Context, ConnectFn, CheckConnFN) error
}

type autoconnector struct {
client *HTTPClient
maxConns int
log *logging.Logger
conns map[cipher.PubKey]struct{}
jdknives marked this conversation as resolved.
Show resolved Hide resolved
}

// MakeConnector returns a new connector that will try to connect to at most maxConns
// services
func MakeConnector(conf Config, maxConns int, log *logging.Logger) Autoconnector {
connector := &autoconnector{}
connector.client = NewClient(log, conf)
connector.maxConns = maxConns
connector.log = log
connector.conns = make(map[cipher.PubKey]struct{})
return connector
}

// Run implements Autoconnector interface
func (a *autoconnector) Run(ctx context.Context, connector ConnectFn, checker CheckConnFN) error {
retrier := netutil.NewRetrier(fetchServicesDelay, 0, 2)
for {
time.Sleep(PublicServiceDelay * 2)
i-hate-nicknames marked this conversation as resolved.
Show resolved Hide resolved
a.checkConns(checker)
if len(a.conns) == a.maxConns {
continue
}
var services []Service
fetch := func() (err error) {
// "return" services up from the closure
services, err = a.client.Services(ctx, a.maxConns)
if err != nil {
return err
}
return nil
}
if err := retrier.Do(fetch); err != nil {
a.log.Errorf("Cannot fetch services: %s", err)
return err
}

for _, service := range services {
pk := service.Addr.PubKey()
if _, ok := a.conns[pk]; ok {
continue
}
err := connector(ctx, pk)
if err != nil {
jdknives marked this conversation as resolved.
Show resolved Hide resolved
// ignore for now?
} else {
a.conns[pk] = struct{}{}
}
}
}
}

// check if existing connections are still active using checker
// and delete those that are not
func (a *autoconnector) checkConns(checker CheckConnFN) {
i-hate-nicknames marked this conversation as resolved.
Show resolved Hide resolved
toDelete := make([]cipher.PubKey, 0)
for pk := range a.conns {
if !checker(pk) {
toDelete = append(toDelete, pk)
}
}
for _, pk := range toDelete {
delete(a.conns, pk)
}
}
137 changes: 87 additions & 50 deletions pkg/servicedisc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"strconv"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/cipher"

"github.com/skycoin/skywire/internal/httpauth"
nu "github.com/skycoin/skywire/internal/netutil"
"github.com/skycoin/skywire/pkg/util/buildinfo"
"github.com/skycoin/skywire/pkg/util/netutil"
)

var (
// ErrVisorUnreachable is returned when visor is unreachable.
ErrVisorUnreachable = errors.New("visor is unreachable")
updateRetryDelay = 5 * time.Second
i-hate-nicknames marked this conversation as resolved.
Show resolved Hide resolved
)

// Config configures the HTTPClient.
Expand Down Expand Up @@ -98,8 +101,12 @@ func (c *HTTPClient) Auth(ctx context.Context) (*httpauth.Client, error) {
}

// Services calls 'GET /api/services'.
func (c *HTTPClient) Services(ctx context.Context) (out []Service, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.addr("/api/services", c.entry.Type), nil)
func (c *HTTPClient) Services(ctx context.Context, quantity int) (out []Service, err error) {
addr := c.addr("/api/services", c.entry.Type)
if quantity != 0 {
i-hate-nicknames marked this conversation as resolved.
Show resolved Hide resolved
addr += "&quantity=" + strconv.Itoa(quantity)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil)
if err != nil {
return nil, err
}
Expand All @@ -125,30 +132,56 @@ func (c *HTTPClient) Services(ctx context.Context) (out []Service, err error) {
return nil, &hErr
}
err = json.NewDecoder(resp.Body).Decode(&out)
return
return out, err
}

// UpdateEntry calls 'POST /api/services'.
func (c *HTTPClient) UpdateEntry(ctx context.Context) (*Service, error) {
auth, err := c.Auth(ctx)
// UpdateEntry calls 'POST /api/services', retrieves the entry
// and updates local field with the result
// if there are no ip addresses in the entry it also tries to fetch those
// from local config
func (c *HTTPClient) UpdateEntry(ctx context.Context) error {
c.entryMx.Lock()
defer c.entryMx.Unlock()
if c.conf.Type == ServiceTypeVisor && len(c.entry.LocalIPs) == 0 {
ips, err := netutil.DefaultNetworkInterfaceIPs()
if err != nil {
return err
}
c.entry.LocalIPs = make([]string, 0, len(ips))
for _, ip := range ips {
c.entry.LocalIPs = append(c.entry.LocalIPs, ip.String())
}
}
c.entry.Addr = NewSWAddr(c.conf.PK, c.conf.Port) // Just in case.

entry, err := c.postEntry(ctx)
if err != nil {
return nil, err
return err
}
c.entry = entry
return nil
}

c.entry.Addr = NewSWAddr(c.conf.PK, c.conf.Port) // Just in case.
// postEntry calls 'POST /api/services' and sends current service entry
// as the payload
func (c *HTTPClient) postEntry(ctx context.Context) (Service, error) {
auth, err := c.Auth(ctx)
if err != nil {
return Service{}, err
}

raw, err := json.Marshal(&c.entry)
if err != nil {
return nil, err
return Service{}, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.addr("/api/services", ""), bytes.NewReader(raw))
if err != nil {
return nil, err
return Service{}, err
}

resp, err := auth.Do(req)
if err != nil {
return nil, err
return Service{}, err
}
if resp != nil {
defer func() {
Expand All @@ -161,19 +194,20 @@ func (c *HTTPClient) UpdateEntry(ctx context.Context) (*Service, error) {
if resp.StatusCode != http.StatusOK {
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response body: %w", err)
return Service{}, fmt.Errorf("read response body: %w", err)
}

var hErr HTTPResponse
if err = json.Unmarshal(respBody, &hErr); err != nil {
return nil, err
return Service{}, err
}

return nil, hErr.Error
return Service{}, hErr.Error
}

err = json.NewDecoder(resp.Body).Decode(&c.entry)
return &c.entry, err
var entry Service
err = json.NewDecoder(resp.Body).Decode(&entry)
return entry, err
}

// DeleteEntry calls 'DELETE /api/services/{entry_addr}'.
Expand Down Expand Up @@ -214,51 +248,54 @@ func (c *HTTPClient) DeleteEntry(ctx context.Context) (err error) {
func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duration) {
defer func() { _ = c.DeleteEntry(context.Background()) }() //nolint:errcheck

update := func() {
for {
c.entryMx.Lock()
entry, err := c.UpdateEntry(ctx)
c.entryMx.Unlock()

if err != nil {
if strings.Contains(err.Error(), ErrVisorUnreachable.Error()) {
c.log.Errorf("Unable to register visor as public trusted as it's unreachable from WAN")
return
}

c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...")
time.Sleep(time.Second * 10) // TODO(evanlinjin): Exponential backoff.
continue
}

c.entryMx.Lock()
j, err := json.Marshal(entry)
c.entryMx.Unlock()

if err != nil {
panic(err)
}
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()

c.log.WithField("entry", string(j)).Debug("Entry updated.")
for {
if err := c.Update(ctx); errors.Is(err, ErrVisorUnreachable) {
return
}
c.entryMx.Lock()
j, err := json.Marshal(c.entry)
c.entryMx.Unlock()
logger := c.log.WithField("entry", string(j))
if err == nil {
logger.Debug("Entry updated.")
} else {
logger.Errorf("Service returned malformed entry, error: %s", err)
return
}
}

// Run initial update.
update()

ticker := time.NewTicker(updateInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
update()
}
}
}

// Update calls 'POST /api/services' to update service discovery entry
// it performs exponential backoff in case of errors during update, unless
// the error is unrecoverable from
func (c *HTTPClient) Update(ctx context.Context) error {
retrier := nu.NewRetrier(updateRetryDelay, 0, 2).WithErrWhitelist(ErrVisorUnreachable)
run := func() error {
err := c.UpdateEntry(ctx)

if errors.Is(err, ErrVisorUnreachable) {
c.log.Errorf("Unable to register visor as public trusted as it's unreachable from WAN")
return err
}

if err != nil {
c.log.WithError(err).Warn("Failed to update service entry in discovery. Retrying...")
return err
}
return nil
}
return retrier.Do(run)
}

// UpdateStats updates the stats field of the internal service entry state.
func (c *HTTPClient) UpdateStats(stats Stats) {
c.entryMx.Lock()
Expand Down
11 changes: 6 additions & 5 deletions pkg/servicedisc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ type Stats struct {

// Service represents a service entry in service-discovery.
type Service struct {
Addr SWAddr `json:"address"`
Type string `json:"type"`
Stats *Stats `json:"stats,omitempty"` // TODO: Have this implemented.
Geo *GeoLocation `json:"geo,omitempty"`
Version string `json:"version,omitempty"`
Addr SWAddr `json:"address"`
Type string `json:"type"`
Stats *Stats `json:"stats,omitempty"` // TODO: Have this implemented.
i-hate-nicknames marked this conversation as resolved.
Show resolved Hide resolved
Geo *GeoLocation `json:"geo,omitempty"`
Version string `json:"version,omitempty"`
LocalIPs []string `json:"local_ips,omitempty"`
}

// MarshalBinary implements encoding.BinaryMarshaller
Expand Down
Loading