Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Kelp GUI: make process namespace multi-tenant and thread through
Browse files Browse the repository at this point in the history
bots using the same name from different users (such as autogen bot) could not be started simultaneously. now we have pre-pended the user_id to the process map key
  • Loading branch information
nikhilsaraf committed Apr 17, 2021
1 parent 86214e8 commit 8002a95
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 95 deletions.
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/spf13/cobra"

"github.com/stellar/kelp/gui/backend"
"github.com/stellar/kelp/support/networking"
"github.com/stellar/kelp/support/sdk"
"github.com/stellar/kelp/support/utils"
Expand Down Expand Up @@ -71,6 +72,7 @@ var rootCcxtRestURL *string

func init() {
validateBuild()
backend.SetVersionString(guiVersion, version)

rootCcxtRestURL = RootCmd.PersistentFlags().String("ccxt-rest-url", "", "URL to use for the CCXT-rest API. Takes precendence over the CCXT_REST_URL param set in the botConfg file for the trade command and passed as a parameter into the Kelp subprocesses started by the GUI (default URL is https://localhost:3000)")

Expand Down
34 changes: 22 additions & 12 deletions cmd/server_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func init() {

uiLogsDirPath := kos.GetDotKelpWorkingDir().Join(uiLogsDir)
log.Printf("calling mkdir on uiLogsDirPath: %s ...", uiLogsDirPath.AsString())
e = kos.Mkdir(uiLogsDirPath)
// no need to pass a userID since we are not running under the context of any user at this point
e = kos.Mkdir("_", uiLogsDirPath)
if e != nil {
panic(errors.Wrap(e, "could not mkdir on uiLogsDirPath: "+uiLogsDirPath.AsString()))
}
Expand Down Expand Up @@ -293,29 +294,34 @@ func init() {
ccxtBinPath := ccxtDestDir.Join(ccxtBinaryName)

log.Printf("mkdir ccxtDirPath: %s ...", ccxtDirPath.AsString())
e := kos.Mkdir(ccxtDirPath)
// no need to pass a userID since we are not running under the context of any user at this point
e := kos.Mkdir("_", ccxtDirPath)
if e != nil {
panic(fmt.Errorf("could not mkdir for ccxtDirPath: %s", e))
}

if runtime.GOOS == "windows" {
ccxtSourceDir := kos.GetBinDir().Join("ccxt").Join(ccxtFilenameNoExt)
e = copyCcxtFolder(kos, ccxtSourceDir, ccxtDestDir)
// no need to pass a userID since we are not running under the context of any user at this point
e = copyCcxtFolder(kos, "_", ccxtSourceDir, ccxtDestDir)
if e != nil {
panic(e)
}
} else {
ccxtBundledZipPath := kos.GetBinDir().Join("ccxt").Join(filenameWithExt)
ccxtZipDestPath := ccxtDirPath.Join(filenameWithExt)
e = copyOrDownloadCcxtBinary(kos, ccxtBundledZipPath, ccxtZipDestPath, filenameWithExt)
// no need to pass a userID since we are not running under the context of any user at this point
e = copyOrDownloadCcxtBinary(kos, "_", ccxtBundledZipPath, ccxtZipDestPath, filenameWithExt)
if e != nil {
panic(e)
}

unzipCcxtFile(kos, ccxtDirPath, ccxtBinPath, filenameWithExt)
// no need to pass a userID since we are not running under the context of any user at this point
unzipCcxtFile(kos, "_", ccxtDirPath, ccxtBinPath, filenameWithExt)
}

e = runCcxtBinary(kos, ccxtBinPath)
// no need to pass a userID since we are not running under the context of any user at this point
e = runCcxtBinary(kos, "_", ccxtBinPath)
if e != nil {
panic(e)
}
Expand Down Expand Up @@ -476,13 +482,14 @@ func setMiddleware(r *chi.Mux) {

func copyCcxtFolder(
kos *kelpos.KelpOS,
userID string,
ccxtSourceDir *kelpos.OSPath,
ccxtDestDir *kelpos.OSPath,
) error {
log.Printf("copying ccxt directory from %s to location %s ...", ccxtSourceDir.AsString(), ccxtDestDir.AsString())

cpCmd := fmt.Sprintf("cp -a %s %s", ccxtSourceDir.Unix(), ccxtDestDir.Unix())
_, e := kos.Blocking("cp-ccxt", cpCmd)
_, e := kos.Blocking(userID, "cp-ccxt", cpCmd)
if e != nil {
return fmt.Errorf("unable to copy ccxt directory from %s to %s: %s", ccxtSourceDir.AsString(), ccxtDestDir.AsString(), e)
}
Expand All @@ -493,6 +500,7 @@ func copyCcxtFolder(

func copyOrDownloadCcxtBinary(
kos *kelpos.KelpOS,
userID string,
ccxtBundledZipPath *kelpos.OSPath,
ccxtZipDestPath *kelpos.OSPath,
filenameWithExt string,
Expand All @@ -505,7 +513,7 @@ func copyOrDownloadCcxtBinary(
log.Printf("copying ccxt from %s to location %s ...", ccxtBundledZipPath.Unix(), ccxtZipDestPath.Unix())

cpCmd := fmt.Sprintf("cp %s %s", ccxtBundledZipPath.Unix(), ccxtZipDestPath.Unix())
_, e = kos.Blocking("cp-ccxt", cpCmd)
_, e = kos.Blocking(userID, "cp-ccxt", cpCmd)
if e != nil {
return fmt.Errorf("unable to copy ccxt zip file from %s to %s: %s", ccxtBundledZipPath.Unix(), ccxtZipDestPath.Unix(), e)
}
Expand Down Expand Up @@ -546,6 +554,7 @@ func copyOrDownloadCcxtBinary(

func unzipCcxtFile(
kos *kelpos.KelpOS,
userID string,
ccxtDir *kelpos.OSPath,
ccxtBinPath *kelpos.OSPath,
filenameWithExt string,
Expand All @@ -558,21 +567,21 @@ func unzipCcxtFile(

log.Printf("unzipping file %s ... ", filenameWithExt)
zipCmd := fmt.Sprintf("cd %s && unzip %s", ccxtDir.Unix(), filenameWithExt)
_, e := kos.Blocking("zip", zipCmd)
_, e := kos.Blocking(userID, "zip", zipCmd)
if e != nil {
log.Fatal(errors.Wrap(e, fmt.Sprintf("unable to unzip file %s in directory %s", filenameWithExt, ccxtDir.AsString())))
}
log.Printf("done\n")
}

func runCcxtBinary(kos *kelpos.KelpOS, ccxtBinPath *kelpos.OSPath) error {
func runCcxtBinary(kos *kelpos.KelpOS, userID string, ccxtBinPath *kelpos.OSPath) error {
if _, e := os.Stat(ccxtBinPath.Native()); os.IsNotExist(e) {
return fmt.Errorf("path to ccxt binary (%s) does not exist", ccxtBinPath.AsString())
}

log.Printf("running binary %s", ccxtBinPath.AsString())
// TODO CCXT should be run at the port specified by rootCcxtRestURL, currently it will default to port 3000 even if the config file specifies otherwise
_, e := kos.Background("ccxt-rest", ccxtBinPath.Unix())
_, e := kos.Background(userID, "ccxt-rest", ccxtBinPath.Unix())
if e != nil {
log.Fatal(errors.Wrap(e, fmt.Sprintf("unable to run ccxt file at location %s", ccxtBinPath.AsString())))
}
Expand Down Expand Up @@ -653,7 +662,8 @@ func writeTrayIcon(kos *kelpos.KelpOS, trayIconPath *kelpos.OSPath, assetsDirPat
// create dir if not exists
if _, e := os.Stat(assetsDirPath.Native()); os.IsNotExist(e) {
log.Printf("mkdir assetsDirPath: %s ...", assetsDirPath.AsString())
e = kos.Mkdir(assetsDirPath)
// no need to pass a userID since we are not running under the context of any user at this point
e = kos.Mkdir("_", assetsDirPath)
if e != nil {
return errors.Wrap(e, "could not mkdir for assetsDirPath: "+assetsDirPath.AsString())
}
Expand Down
12 changes: 6 additions & 6 deletions gui/backend/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,33 +281,33 @@ func (s *APIServer) writeJsonWithLog(w http.ResponseWriter, v interface{}, doLog
w.Write(marshalledJson)
}

func (s *APIServer) runKelpCommandBlocking(namespace string, cmd string) ([]byte, error) {
func (s *APIServer) runKelpCommandBlocking(userID string, namespace string, cmd string) ([]byte, error) {
// There is a weird issue on windows where the absolute path for the kelp binary does not work on the release GUI
// version because of the unzipped directory name but it will work on the released cli version or if we change the
// name of the folder in which the GUI version is unzipped.
// To avoid these issues we only invoke with the binary name as opposed to the absolute path that contains the
// directory name. see start_bot.go for some experimentation with absolute and relative paths
cmdString := fmt.Sprintf("%s %s", s.kelpBinPath.Unix(), cmd)
return s.kos.Blocking(namespace, cmdString)
return s.kos.Blocking(userID, namespace, cmdString)
}

func (s *APIServer) runKelpCommandBackground(namespace string, cmd string) (*kelpos.Process, error) {
func (s *APIServer) runKelpCommandBackground(userID string, namespace string, cmd string) (*kelpos.Process, error) {
// There is a weird issue on windows where the absolute path for the kelp binary does not work on the release GUI
// version because of the unzipped directory name but it will work on the released cli version or if we change the
// name of the folder in which the GUI version is unzipped.
// To avoid these issues we only invoke with the binary name as opposed to the absolute path that contains the
// directory name. see start_bot.go for some experimentation with absolute and relative paths
cmdString := fmt.Sprintf("%s %s", s.kelpBinPath.Unix(), cmd)
return s.kos.Background(namespace, cmdString)
return s.kos.Background(userID, namespace, cmdString)
}

func (s *APIServer) setupOpsDirectory(userID string) error {
e := s.kos.Mkdir(s.botConfigsPathForUser(userID))
e := s.kos.Mkdir(userID, s.botConfigsPathForUser(userID))
if e != nil {
return fmt.Errorf("error setting up configs directory (%s): %s", s.botConfigsPathForUser(userID).Native(), e)
}

e = s.kos.Mkdir(s.botLogsPathForUser(userID))
e = s.kos.Mkdir(userID, s.botLogsPathForUser(userID))
if e != nil {
return fmt.Errorf("error setting up logs directory (%s): %s", s.botLogsPathForUser(userID).Native(), e)
}
Expand Down
2 changes: 1 addition & 1 deletion gui/backend/delete_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *APIServer) deleteBot(w http.ResponseWriter, r *http.Request) {
// delete configs
botPrefix := model2.GetPrefix(botName)
botConfigPath := s.botConfigsPathForUser(req.UserData.ID).Join(botPrefix)
_, e = s.kos.Blocking("rm", fmt.Sprintf("rm %s*", botConfigPath.Unix()))
_, e = s.kos.Blocking(req.UserData.ID, "rm", fmt.Sprintf("rm %s*", botConfigPath.Unix()))
if e != nil {
s.writeKelpError(req.UserData, w, makeKelpErrorResponseWrapper(
errorTypeBot,
Expand Down
2 changes: 1 addition & 1 deletion gui/backend/generate_bot_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *APIServer) doGenerateBotName(userData UserData) (string, error) {

func (s *APIServer) prefixExists(userData UserData, prefix string) (bool, error) {
command := fmt.Sprintf("ls %s | grep %s", s.botConfigsPathForUser(userData.ID).Unix(), prefix)
_, e := s.kos.Blocking("prefix", command)
_, e := s.kos.Blocking(userData.ID, "prefix", command)
if e != nil {
if strings.Contains(e.Error(), "exit status 1") {
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion gui/backend/list_bots.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *APIServer) listBots(w http.ResponseWriter, r *http.Request) {

func (s *APIServer) doListBots(userData UserData) ([]model2.Bot, error) {
bots := []model2.Bot{}
resultBytes, e := s.kos.Blocking("ls", fmt.Sprintf("ls %s | sort", s.botConfigsPathForUser(userData.ID).Unix()))
resultBytes, e := s.kos.Blocking(userData.ID, "ls", fmt.Sprintf("ls %s | sort", s.botConfigsPathForUser(userData.ID).Unix()))
if e != nil {
return bots, fmt.Errorf("error when listing bots: %s", e)
}
Expand Down
4 changes: 2 additions & 2 deletions gui/backend/start_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *APIServer) doStartBot(userData UserData, botName string, strategy strin
}
log.Printf("run command for bot '%s': %s\n", botName, command)

p, e := s.runKelpCommandBackground(botName, command)
p, e := s.runKelpCommandBackground(userData.ID, botName, command)
if e != nil {
return fmt.Errorf("could not start bot %s: %s", botName, e)
}
Expand All @@ -156,7 +156,7 @@ func (s *APIServer) doStartBot(userData UserData, botName string, strategy strin
}

go func(kelpCommand *exec.Cmd, name string) {
defer s.kos.SafeUnregister(name)
defer s.kos.SafeUnregister(userData.ID, name)

e := kelpCommand.Wait()
if e != nil {
Expand Down
2 changes: 1 addition & 1 deletion gui/backend/stop_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *APIServer) doStopBot(userData UserData, botName string) error {
return fmt.Errorf("error advancing bot state: %s", e)
}

e = s.kos.Stop(botName)
e = s.kos.Stop(userData.ID, botName)
if e != nil {
return fmt.Errorf("error when killing bot %s: %s", botName, e)
}
Expand Down
21 changes: 9 additions & 12 deletions gui/backend/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@ import (
"strings"
)

func (s *APIServer) version(w http.ResponseWriter, r *http.Request) {
guiVersionBytes, e := s.runKelpCommandBlocking("version", "version | grep 'gui version' | cut -d':' -f2,3")
if e != nil {
s.writeError(w, fmt.Sprintf("error in version command: %s\n", e))
return
}
cliVersionBytes, e := s.runKelpCommandBlocking("version", "version | grep 'cli version' | cut -d':' -f2,3")
if e != nil {
s.writeError(w, fmt.Sprintf("error in version command: %s\n", e))
return
}
// this will be set automatically from root command
var versionString = ""

// SetVersionString sets the version string to be displayed in the GUI
func SetVersionString(guiVersion string, cliVersion string) {
versionString = fmt.Sprintf("%s (%s)", strings.TrimSpace(guiVersion), strings.TrimSpace(cliVersion))
}

versionBytes := []byte(fmt.Sprintf("%s (%s)", strings.TrimSpace(string(guiVersionBytes)), strings.TrimSpace(string(cliVersionBytes))))
func (s *APIServer) version(w http.ResponseWriter, r *http.Request) {
versionBytes := []byte(versionString)
w.WriteHeader(http.StatusOK)
w.Write(versionBytes)
}
Loading

0 comments on commit 8002a95

Please sign in to comment.