Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace unix pipes with TCP sockets #231

Merged
merged 3 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/skywire-cli/commands/visor/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func defaultConfig() *visor.Config {

conf.Interfaces.RPCAddress = "localhost:3435"

conf.AppServerSockFile = "/tmp/visor_" + pk.Hex() + ".sock"
conf.AppServerPort = 5505

return conf
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/skywire-visor/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ func (cfg *runCfg) runVisor() *runCfg {
time.Sleep(startDelay)

if cfg.conf.DmsgPty != nil {
err = visor.UnlinkSocketFiles(cfg.conf.AppServerSockFile, cfg.conf.DmsgPty.CLIAddr)
} else {
err = visor.UnlinkSocketFiles(cfg.conf.AppServerSockFile)
err = visor.UnlinkSocketFiles(cfg.conf.DmsgPty.CLIAddr)
Darkren marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/app/appcommon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package appcommon

// Config defines configuration parameters for `Proc`.
type Config struct {
Name string `json:"name"`
SockFilePath string `json:"sock_file_path"`
VisorPK string `json:"visor_pk"`
BinaryDir string `json:"binary_dir"`
WorkDir string `json:"work_dir"`
Name string `json:"name"`
ServerHost string `json:"server_host"`
ServerPort uint `json:"server_port"`
Darkren marked this conversation as resolved.
Show resolved Hide resolved
VisorPK string `json:"visor_pk"`
BinaryDir string `json:"binary_dir"`
WorkDir string `json:"work_dir"`
}
6 changes: 4 additions & 2 deletions pkg/app/appcommon/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package appcommon
const (
// EnvAppKey is a name for env arg containing skywire application key.
EnvAppKey = "APP_KEY"
// EnvSockFile is a name for env arg containing unix socket file name.
EnvSockFile = "SW_UNIX"
// EnvServerHost is a name for env arg containing app server host.
EnvServerHost = "APP_HOST"
// EnvServerPort is a name for env arg containing app server port.
EnvServerPort = "APP_PORT"
// EnvVisorPK is a name for env arg containing public key of visor.
EnvVisorPK = "VISOR_PK"
)
12 changes: 7 additions & 5 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ func NewProc(log *logging.Logger, c appcommon.Config, args []string, stdout, std
binaryPath := getBinaryPath(c.BinaryDir, c.Name)

const (
appKeyEnvFormat = appcommon.EnvAppKey + "=%s"
sockFileEnvFormat = appcommon.EnvSockFile + "=%s"
visorPKEnvFormat = appcommon.EnvVisorPK + "=%s"
appKeyEnvFormat = appcommon.EnvAppKey + "=%s"
serverHostEnvFormat = appcommon.EnvServerHost + "=%s"
serverPortEnvFormat = appcommon.EnvServerPort + "=%d"
visorPKEnvFormat = appcommon.EnvVisorPK + "=%s"
)

env := make([]string, 0, 3)
env := make([]string, 0, 4)
env = append(env, fmt.Sprintf(appKeyEnvFormat, key))
env = append(env, fmt.Sprintf(sockFileEnvFormat, c.SockFilePath))
env = append(env, fmt.Sprintf(serverHostEnvFormat, c.ServerHost))
env = append(env, fmt.Sprintf(serverPortEnvFormat, c.ServerPort))
env = append(env, fmt.Sprintf(visorPKEnvFormat, c.VisorPK))

cmd := exec.Command(binaryPath, args...) // nolint:gosec
Expand Down
11 changes: 8 additions & 3 deletions pkg/app/appserver/proc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
"github.com/stretchr/testify/require"
)

const (
testHost = ""
testPort = uint(5505)
)

func TestProcManager_Exists(t *testing.T) {
srv := New(nil, "")
srv := New(nil, testHost, testPort)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand All @@ -27,7 +32,7 @@ func TestProcManager_Exists(t *testing.T) {
}

func TestProcManager_Range(t *testing.T) {
srv := New(nil, "")
srv := New(nil, testHost, testPort)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand Down Expand Up @@ -57,7 +62,7 @@ func TestProcManager_Range(t *testing.T) {
}

func TestProcManager_Pop(t *testing.T) {
srv := New(nil, "")
srv := New(nil, testHost, testPort)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand Down
26 changes: 14 additions & 12 deletions pkg/app/appserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@ import (

// Server is a server for app/visor communication.
type Server struct {
log *logging.Logger
lis net.Listener
sockFile string
rpcS *rpc.Server
done sync.WaitGroup
stopCh chan struct{}
log *logging.Logger
lis net.Listener
host string
port uint
rpcS *rpc.Server
done sync.WaitGroup
stopCh chan struct{}
}

// New constructs server.
func New(log *logging.Logger, sockFile string) *Server {
func New(log *logging.Logger, host string, port uint) *Server {
return &Server{
log: log,
sockFile: sockFile,
rpcS: rpc.NewServer(),
stopCh: make(chan struct{}),
log: log,
host: host,
port: port,
rpcS: rpc.NewServer(),
stopCh: make(chan struct{}),
}
}

Expand All @@ -42,7 +44,7 @@ func (s *Server) Register(appKey appcommon.Key) error {

// ListenAndServe starts listening for incoming app connections via unix socket.
func (s *Server) ListenAndServe() error {
l, err := net.Listen("unix", s.sockFile)
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.host, s.port))
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/app/appserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ import (
)

const (
sockFile = "/tmp/app.sock"
testHost = ""
testPort = uint(5505)
sleepDelay = 500 * time.Millisecond
)

func TestServer_ListenAndServe(t *testing.T) {
l := logging.MustGetLogger("app_server")

s := appserver.New(l, sockFile)
s := appserver.New(l, testHost, testPort)

appKey := appcommon.GenerateAppKey()

require.NoError(t, s.Register(appKey))

visorPK, _ := cipher.GenerateKeyPair()
clientConfig := app.ClientConfig{
VisorPK: visorPK,
SockFile: sockFile,
AppKey: appKey,
VisorPK: visorPK,
ServerHost: testHost,
ServerPort: testPort,
AppKey: appKey,
}

errCh := make(chan error, 1)
Expand Down
36 changes: 24 additions & 12 deletions pkg/app/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"net/rpc"
"os"
"strconv"
"strings"

"github.com/SkycoinProject/dmsg/cipher"
Expand All @@ -22,17 +23,20 @@ var (
ErrVisorPKNotProvided = errors.New("visor PK is not provided")
// ErrVisorPKInvalid is returned when the visor PK is invalid.
ErrVisorPKInvalid = errors.New("visor PK is invalid")
// ErrSockFileNotProvided is returned when the sock file is not provided.
ErrSockFileNotProvided = errors.New("sock file is not provided")
// ErrServerPortNotProvided is returned when the app server port is not provided.
ErrServerPortNotProvided = errors.New("server port is not provided")
// ErrServerPortInvalid is returned when the the app server port is invalid.
ErrServerPortInvalid = errors.New("server port is invalid")
// ErrAppKeyNotProvided is returned when the app key is not provided.
ErrAppKeyNotProvided = errors.New("app key is not provided")
)

// ClientConfig is a configuration for `Client`.
type ClientConfig struct {
VisorPK cipher.PubKey
SockFile string
AppKey appcommon.Key
VisorPK cipher.PubKey
ServerHost string
ServerPort uint
AppKey appcommon.Key
}

// ClientConfigFromEnv creates client config from the ENV args.
Expand All @@ -42,9 +46,16 @@ func ClientConfigFromEnv() (ClientConfig, error) {
return ClientConfig{}, ErrAppKeyNotProvided
}

sockFile := os.Getenv(appcommon.EnvSockFile)
if sockFile == "" {
return ClientConfig{}, ErrSockFileNotProvided
serverHost := os.Getenv(appcommon.EnvServerHost)

serverPortStr := os.Getenv(appcommon.EnvServerPort)
if serverPortStr == "" {
return ClientConfig{}, ErrServerPortNotProvided
}

serverPort, err := strconv.ParseUint(serverPortStr, 10, 64)
if err != nil {
return ClientConfig{}, ErrServerPortInvalid
}

visorPKStr := os.Getenv(appcommon.EnvVisorPK)
Expand All @@ -58,9 +69,10 @@ func ClientConfigFromEnv() (ClientConfig, error) {
}

return ClientConfig{
VisorPK: visorPK,
SockFile: sockFile,
AppKey: appcommon.Key(appKey),
VisorPK: visorPK,
ServerHost: serverHost,
ServerPort: uint(serverPort),
AppKey: appcommon.Key(appKey),
}, nil
}

Expand All @@ -77,7 +89,7 @@ type Client struct {
// - log: logger instance.
// - config: client configuration.
func NewClient(log *logging.Logger, config ClientConfig) (*Client, error) {
rpcCl, err := rpc.Dial("unix", config.SockFile)
rpcCl, err := rpc.Dial("tcp", fmt.Sprintf("%s:%d", config.ServerHost, config.ServerPort))
if err != nil {
return nil, fmt.Errorf("error connecting to the app server: %v", err)
}
Expand Down
51 changes: 42 additions & 9 deletions pkg/app/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"errors"
"os"
"strconv"
"testing"

"github.com/SkycoinProject/dmsg/cipher"
Expand All @@ -20,7 +21,10 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "")
err = os.Setenv(appcommon.EnvServerHost, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerPort, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, "")
Expand All @@ -33,15 +37,19 @@ func TestClientConfigFromEnv(t *testing.T) {
visorPK, _ := cipher.GenerateKeyPair()

wantCfg := ClientConfig{
VisorPK: visorPK,
SockFile: "sock.unix",
AppKey: "key",
VisorPK: visorPK,
ServerHost: "localhost",
ServerPort: 5505,
AppKey: "key",
}

err := os.Setenv(appcommon.EnvAppKey, string(wantCfg.AppKey))
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, wantCfg.SockFile)
err = os.Setenv(appcommon.EnvServerHost, wantCfg.ServerHost)
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerPort, strconv.FormatUint(uint64(wantCfg.ServerPort), 10))
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, wantCfg.VisorPK.Hex())
Expand All @@ -59,14 +67,33 @@ func TestClientConfigFromEnv(t *testing.T) {
require.Equal(t, err, ErrAppKeyNotProvided)
})

t.Run("no sock file", func(t *testing.T) {
t.Run("no app server port", func(t *testing.T) {
resetEnv(t)

err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerHost, "localhost")
require.NoError(t, err)

_, err = ClientConfigFromEnv()
require.Equal(t, err, ErrServerPortNotProvided)
})

t.Run("invalid app server port", func(t *testing.T) {
resetEnv(t)

err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerHost, "localhost")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerPort, "invalid")
require.NoError(t, err)

_, err = ClientConfigFromEnv()
require.Equal(t, err, ErrSockFileNotProvided)
require.Equal(t, err, ErrServerPortInvalid)
})

t.Run("no visor PK", func(t *testing.T) {
Expand All @@ -75,7 +102,10 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "val")
err = os.Setenv(appcommon.EnvServerHost, "localhost")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerPort, "5505")
require.NoError(t, err)

_, err = ClientConfigFromEnv()
Expand All @@ -88,7 +118,10 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "val")
err = os.Setenv(appcommon.EnvServerHost, "localhost")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvServerPort, "5505")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, "val")
Expand Down
3 changes: 2 additions & 1 deletion pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ type Config struct {

Interfaces InterfaceConfig `json:"interfaces"`

AppServerSockFile string `json:"app_server_sock_file"`
AppServerHost string `json:"app_server_host"`
AppServerPort uint `json:"app_server_port"`

RestartCheckDelay string `json:"restart_check_delay,omitempty"`
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/visor/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ func TestStartStopApp(t *testing.T) {
unknownApp := "bar"
app := apps["foo"].App

visorCfg := Config{}
visorCfg := Config{
AppServerHost: "",
AppServerPort: 5505,
}
visorCfg.Visor.StaticPubKey = pk

visor := &Visor{
Expand All @@ -164,10 +167,11 @@ func TestStartStopApp(t *testing.T) {
}()

appCfg1 := appcommon.Config{
Name: app,
SockFilePath: visorCfg.AppServerSockFile,
VisorPK: visorCfg.Visor.StaticPubKey.Hex(),
WorkDir: filepath.Join("", app),
Name: app,
ServerHost: visorCfg.AppServerHost,
ServerPort: visorCfg.AppServerPort,
VisorPK: visorCfg.Visor.StaticPubKey.Hex(),
WorkDir: filepath.Join("", app),
}

appArgs1 := append([]string{filepath.Join(visor.dir(), app)}, apps["foo"].Args...)
Expand Down
Loading