Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vpn client #901

Merged
merged 19 commits into from
Oct 6, 2021
76 changes: 45 additions & 31 deletions internal/vpn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (

"github.com/skycoin/skywire/pkg/app"
"github.com/skycoin/skywire/pkg/app/appnet"
"github.com/skycoin/skywire/pkg/app/appserver"
"github.com/skycoin/skywire/pkg/routefinder/rfclient"
"github.com/skycoin/skywire/pkg/router"
"github.com/skycoin/skywire/pkg/routing"
"github.com/skycoin/skywire/pkg/skyenv"
skynetutil "github.com/skycoin/skywire/pkg/util/netutil"
Expand All @@ -34,7 +37,6 @@ type Client struct {
log *logrus.Logger
cfg ClientConfig
appCl *app.Client
r *netutil.Retrier
directIPSMu sync.Mutex
directIPs []net.IP
defaultGateway net.IP
Expand Down Expand Up @@ -109,13 +111,7 @@ func NewClient(cfg ClientConfig, appCl *app.Client) (*Client, error) {
directIPs = append(directIPs, utIP)
}

const (
serverDialInitBO = 1 * time.Second
serverDialMaxBO = 10 * time.Second
)

log := logrus.New()
r := netutil.NewRetrier(log, serverDialInitBO, serverDialMaxBO, 0, 1)

defaultGateway, err := DefaultNetworkGateway()
if err != nil {
Expand All @@ -128,7 +124,6 @@ func NewClient(cfg ClientConfig, appCl *app.Client) (*Client, error) {
log: log,
cfg: cfg,
appCl: appCl,
r: r,
directIPs: filterOutEqualIPs(directIPs),
defaultGateway: defaultGateway,
closeC: make(chan struct{}),
Expand Down Expand Up @@ -188,16 +183,35 @@ func (c *Client) Serve() error {

c.setAppStatus(ClientStatusConnecting)

r := netutil.NewDefaultRetrier(c.log)
errNoTransportFound := appserver.RPCErr{
Err: router.ErrNoTransportFound.Error(),
}

errTransportNotFound := appserver.RPCErr{
Err: rfclient.ErrTransportNotFound.Error(),
}

r := netutil.NewRetrier(c.log, netutil.DefaultInitBackoff, netutil.DefaultMaxBackoff, 3, netutil.DefaultFactor).
WithErrWhitelist(errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs,
errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound)

err := r.Do(context.Background(), func() error {
if c.isClosed() {
return nil
}

if err := c.dialServeConn(); err != nil {
c.setAppStatus(ClientStatusReconnecting)
fmt.Println("Connection broke, reconnecting...")
return fmt.Errorf("dialServeConn: %w", err)
switch err {
case errHandshakeStatusForbidden, errHandshakeStatusInternalError, errHandshakeNoFreeIPs,
jdknives marked this conversation as resolved.
Show resolved Hide resolved
errHandshakeStatusBadRequest, errNoTransportFound, errTransportNotFound:
c.setAppError(err)
return err
default:
c.setAppStatus(ClientStatusReconnecting)
c.setAppError(errTimeout)
fmt.Println("\nConnection broke, reconnecting...")
return fmt.Errorf("dialServeConn: %w", err)
}
}

return nil
Expand Down Expand Up @@ -326,7 +340,8 @@ func (c *Client) setupTUN(tunIP, tunGateway net.IP) error {
func (c *Client) serveConn(conn net.Conn) error {
tunIP, tunGateway, err := c.shakeHands(conn)
if err != nil {
return fmt.Errorf("error during client/server handshake: %w", err)
fmt.Printf("error during client/server handshake: %s", err)
return err
}

fmt.Printf("Performed handshake with %s\n", conn.RemoteAddr())
Expand Down Expand Up @@ -429,7 +444,8 @@ func (c *Client) serveConn(conn net.Conn) error {
func (c *Client) dialServeConn() error {
conn, err := c.dialServer(c.appCl, c.cfg.ServerPK)
if err != nil {
return fmt.Errorf("error connecting to VPN server: %w", err)
fmt.Printf("error connecting to VPN server: %s", err)
return err
}

fmt.Printf("Dialed %s\n", conn.RemoteAddr())
Expand All @@ -445,7 +461,8 @@ func (c *Client) dialServeConn() error {
}

if err := c.serveConn(conn); err != nil {
return fmt.Errorf("error serving app conn: %w", err)
fmt.Printf("error serving app conn: %s", err)
return err
}

return nil
Expand Down Expand Up @@ -663,7 +680,7 @@ func (c *Client) shakeHands(conn net.Conn) (TUNIP, TUNGateway net.IP, err error)
fmt.Printf("Got server hello: %v", sHello)

if sHello.Status != HandshakeStatusOK {
return nil, nil, fmt.Errorf("got status %d (%s) from the server", sHello.Status, sHello.Status)
return nil, nil, sHello.Status.getError()
}

return sHello.TUNIP, sHello.TUNGateway, nil
Expand All @@ -684,22 +701,13 @@ func (c *Client) dialServer(appCl *app.Client, pk cipher.PubKey) (net.Conn, erro
)

var conn net.Conn
err := c.r.Do(context.Background(), func() error {
var err error
conn, err = appCl.Dial(appnet.Addr{
Net: netType,
PubKey: pk,
Port: vpnPort,
})

if c.isClosed() {
// in this case client got closed, we return no error,
// so that retrier could stop gracefully
return nil
}

return err
var err error
conn, err = appCl.Dial(appnet.Addr{
Net: netType,
PubKey: pk,
Port: vpnPort,
})

if err != nil {
return nil, err
}
Expand All @@ -719,6 +727,12 @@ func (c *Client) setAppStatus(status ClientStatus) {
}
}

func (c *Client) setAppError(appErr error) {
if err := c.appCl.SetError(appErr.Error()); err != nil {
fmt.Printf("Failed to set error %v: %v\n", appErr, err)
}
}

func (c *Client) isClosed() bool {
select {
case <-c.closeC:
Expand Down
7 changes: 6 additions & 1 deletion internal/vpn/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@ package vpn
import "errors"

var (
errCouldFindDefaultNetworkGateway = errors.New("could not find default network gateway")
errCouldFindDefaultNetworkGateway = errors.New("Could not find default network gateway")
errHandshakeStatusForbidden = errors.New("Password didn't match")
errHandshakeStatusInternalError = errors.New("Internal server error")
errHandshakeNoFreeIPs = errors.New("No free IPs left to serve")
errHandshakeStatusBadRequest = errors.New("Request was malformed")
errTimeout = errors.New("Internal error: Timeout")
)
19 changes: 19 additions & 0 deletions internal/vpn/handshake_status.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package vpn

import "errors"

// HandshakeStatus is a status of Client/Server handshake.
type HandshakeStatus int

Expand Down Expand Up @@ -32,3 +34,20 @@ func (hs HandshakeStatus) String() string {
return "Unknown code"
}
}

func (hs HandshakeStatus) getError() error {
jdknives marked this conversation as resolved.
Show resolved Hide resolved
switch hs {
case HandshakeStatusOK:
return nil
case HandshakeStatusBadRequest:
return errHandshakeStatusBadRequest
case HandshakeNoFreeIPs:
return errHandshakeNoFreeIPs
case HandshakeStatusInternalError:
return errHandshakeStatusInternalError
case HandshakeStatusForbidden:
return errHandshakeStatusForbidden
default:
return errors.New("Unknown error code")
}
}
16 changes: 15 additions & 1 deletion pkg/app/appserver/mock_rpc_ingress_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Proc struct {

statusMx sync.RWMutex
status string
errMx sync.RWMutex
err string
}

// NewProc constructs `Proc`.
Expand Down Expand Up @@ -170,7 +172,8 @@ func (p *Proc) Start() error {
// here will definitely be an error notifying that the process
// is already stopped. We do this to remove proc from the manager,
// therefore giving the correct app status to hypervisor.
_ = p.m.Stop(p.appName) //nolint:errcheck
_ = p.m.SetError(p.appName, p.err) //nolint:errcheck
_ = p.m.Stop(p.appName) //nolint:errcheck
}()

select {
Expand Down Expand Up @@ -285,6 +288,22 @@ func (p *Proc) DetailedStatus() string {
return p.status
}

// SetError sets proc's detailed status error.
func (p *Proc) SetError(appErr string) {
p.errMx.Lock()
defer p.errMx.Unlock()

p.err = appErr
}

// Error gets proc's error.
func (p *Proc) Error() string {
p.errMx.RLock()
defer p.errMx.RUnlock()

return p.err
}

// ConnectionSummary sums up the connection stats.
type ConnectionSummary struct {
IsAlive bool `json:"is_alive"`
Expand Down
28 changes: 25 additions & 3 deletions pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type ProcManager interface {
io.Closer
Start(conf appcommon.ProcConfig) (appcommon.ProcID, error)
ProcByName(appName string) (*Proc, bool)
SetError(appName, status string) error
ErrorByName(appName string) (string, bool)
Stop(appName string) error
Wait(appName string) error
Range(next func(appName string, proc *Proc) bool)
Expand All @@ -61,6 +63,7 @@ type procManager struct {
procs map[string]*Proc
procsByKey map[appcommon.ProcKey]*Proc

errors map[string]string
// event broadcaster: broadcasts events to apps
eb *appevent.Broadcaster

Expand Down Expand Up @@ -93,6 +96,7 @@ func NewProcManager(mLog *logging.MasterLogger, discF *appdisc.Factory, eb *appe
discF: discF,
procs: make(map[string]*Proc),
procsByKey: make(map[appcommon.ProcKey]*Proc),
errors: make(map[string]string),
eb: eb,
done: make(chan struct{}),
}
Expand Down Expand Up @@ -209,7 +213,7 @@ func (m *procManager) Start(conf appcommon.ProcConfig) (appcommon.ProcID, error)

return 0, err
}

delete(m.errors, conf.AppName)
return appcommon.ProcID(proc.cmd.Process.Pid), nil
}

Expand All @@ -221,6 +225,14 @@ func (m *procManager) ProcByName(appName string) (*Proc, bool) {
return proc, ok
}

func (m *procManager) ErrorByName(appName string) (string, bool) {
m.mx.RLock()
defer m.mx.RUnlock()

err, ok := m.errors[appName]
return err, ok
}

// Stop stops the application.
func (m *procManager) Stop(name string) error {
p, err := m.pop(name)
Expand All @@ -245,14 +257,14 @@ func (m *procManager) Wait(name string) error {
err = fmt.Errorf("failed to run app executable %s: %w", name, err)
}

if _, err := m.pop(name); err != nil {
if _, err := m.pop(name); err != nil { //nolint:errcheck
m.log.Debugf("Remove app <%v>: %v", name, err)
}

return err
}

_, err = m.pop(name)
_, err = m.pop(name) //nolint:errcheck

return err
}
Expand Down Expand Up @@ -310,6 +322,16 @@ func (m *procManager) DetailedStatus(appName string) (string, error) {
return p.DetailedStatus(), nil
}

// SetError sets error `appErr` for app `appName`.
func (m *procManager) SetError(appName, appErr string) error {
m.mx.Lock()
defer m.mx.Unlock()

m.errors[appName] = appErr

return nil
}

// ConnectionsSummary gets connections info for the app `appName`.
func (m *procManager) ConnectionsSummary(appName string) ([]ConnectionSummary, error) {
p, err := m.get(appName)
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/appserver/proc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ func TestProcManager_ProcByName(t *testing.T) {

appName := "app"

_, ok = m.ProcByName(appName)
_, ok = m.ProcByName(appName) //nolint:errcheck
require.False(t, ok)

m.mx.Lock()
m.procs[appName] = nil
m.mx.Unlock()

_, ok = m.ProcByName(appName)
_, ok = m.ProcByName(appName) //nolint:errcheck
require.True(t, ok)
}

Expand Down
Loading