Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Kelp ui autogenerate bot config and process mgmt (#177)
Browse files Browse the repository at this point in the history
- extract and unify common toml representations for ExchangeAPIKeys, ExchangeParams, and ExchangeHeaders
- UI dashboard work to autogenerate bot config, accounts, trustlines, along with new start and stop endpoints
  • Loading branch information
nikhilsaraf authored May 21, 2019
1 parent a6ffa95 commit e686d7f
Show file tree
Hide file tree
Showing 25 changed files with 766 additions and 156 deletions.
27 changes: 3 additions & 24 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"bufio"
"fmt"
"log"
"net/http"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/spf13/cobra"
"github.com/stellar/kelp/gui"
"github.com/stellar/kelp/gui/backend"
"github.com/stellar/kelp/support/utils"
)

var serverCmd = &cobra.Command{
Expand Down Expand Up @@ -112,7 +112,7 @@ func runWithYarn(options serverInputs) {
os.Setenv("PORT", fmt.Sprintf("%d", *options.port))

log.Printf("Serving frontend via yarn on HTTP port: %d\n", *options.port)
e := runCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "start"))
e := utils.RunCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "start"))
if e != nil {
panic(e)
}
Expand All @@ -121,32 +121,11 @@ func runWithYarn(options serverInputs) {
func generateStaticFiles() {
log.Printf("generating contents of gui/web/build ...\n")

e := runCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "build"))
e := utils.RunCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "build"))
if e != nil {
panic(e)
}

log.Printf("... finished generating contents of gui/web/build\n")
log.Println()
}

func runCommandStreamOutput(command *exec.Cmd) error {
stdout, e := command.StdoutPipe()
if e != nil {
return fmt.Errorf("error while creating Stdout pipe: %s", e)
}
command.Start()

scanner := bufio.NewScanner(stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
log.Printf("\t%s\n", line)
}

e = command.Wait()
if e != nil {
return fmt.Errorf("could not execute command: %s", e)
}
return nil
}
9 changes: 1 addition & 8 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,6 @@ func makeExchangeShimSdex(
var e error
var exchangeShim api.ExchangeShim
if !botConfig.IsTradingSdex() {
exchangeAPIKeys := []api.ExchangeAPIKey{}
for _, apiKey := range botConfig.ExchangeAPIKeys {
exchangeAPIKeys = append(exchangeAPIKeys, api.ExchangeAPIKey{
Key: apiKey.Key,
Secret: apiKey.Secret,
})
}

exchangeParams := []api.ExchangeParam{}
for _, param := range botConfig.ExchangeParams {
exchangeParams = append(exchangeParams, api.ExchangeParam{
Expand All @@ -222,6 +214,7 @@ func makeExchangeShimSdex(
})
}

exchangeAPIKeys := botConfig.ExchangeAPIKeys.ToExchangeAPIKeys()
var exchangeAPI api.Exchange
exchangeAPI, e = plugins.MakeTradingExchange(botConfig.TradingExchange, exchangeAPIKeys, exchangeParams, exchangeHeaders, *options.simMode)
if e != nil {
Expand Down
129 changes: 123 additions & 6 deletions gui/backend/api_server.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package backend

import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"sync"

"github.com/stellar/kelp/support/utils"
)

// APIServer is an instance of the API service
type APIServer struct {
binPath string
dirPath string
binPath string
configsDir string
logsDir string
processes map[string]*exec.Cmd
processLock *sync.Mutex
}

// MakeAPIServer is a factory method
Expand All @@ -19,16 +31,121 @@ func MakeAPIServer() (*APIServer, error) {
return nil, fmt.Errorf("could not get binPath of currently running binary: %s", e)
}

dirPath := filepath.Dir(binPath)
configsDir := dirPath + "/ops/configs"
logsDir := dirPath + "/ops/logs"

return &APIServer{
binPath: binPath,
dirPath: dirPath,
binPath: binPath,
configsDir: configsDir,
logsDir: logsDir,
processes: map[string]*exec.Cmd{},
processLock: &sync.Mutex{},
}, nil
}

func (s *APIServer) runCommand(cmd string) ([]byte, error) {
func (s *APIServer) registerCommand(namespace string, c *exec.Cmd) error {
s.processLock.Lock()
defer s.processLock.Unlock()

if _, exists := s.processes[namespace]; exists {
return fmt.Errorf("process with namespace already exists: %s", namespace)
}

s.processes[namespace] = c
log.Printf("registered command under namespace '%s' with PID: %d", namespace, c.Process.Pid)
return nil
}

func (s *APIServer) unregisterCommand(namespace string) error {
s.processLock.Lock()
defer s.processLock.Unlock()

if c, exists := s.processes[namespace]; exists {
delete(s.processes, namespace)
log.Printf("unregistered command under namespace '%s' with PID: %d", namespace, c.Process.Pid)
return nil
}
return fmt.Errorf("process with namespace does not exist: %s", namespace)
}

func (s *APIServer) getCommand(namespace string) (*exec.Cmd, bool) {
s.processLock.Lock()
defer s.processLock.Unlock()

c, exists := s.processes[namespace]
return c, exists
}

func (s *APIServer) safeUnregisterCommand(namespace string) {
s.unregisterCommand(namespace)
}

func (s *APIServer) stopCommand(namespace string) error {
if c, exists := s.getCommand(namespace); exists {
e := s.unregisterCommand(namespace)
if e != nil {
return fmt.Errorf("could not stop command because of an error when unregistering command for namespace '%s': %s", namespace, e)
}

log.Printf("killing process %d\n", c.Process.Pid)
return c.Process.Kill()
}
return fmt.Errorf("process with namespace does not exist: %s", namespace)
}

func (s *APIServer) runKelpCommandBlocking(namespace string, cmd string) ([]byte, error) {
cmdString := fmt.Sprintf("%s %s", s.binPath, cmd)
return s.runBashCommandBlocking(namespace, cmdString)
}

func (s *APIServer) runKelpCommandBackground(namespace string, cmd string) (*exec.Cmd, error) {
cmdString := fmt.Sprintf("%s %s", s.binPath, cmd)
bytes, e := exec.Command("bash", "-c", cmdString).Output()
return s.runBashCommandBackground(namespace, cmdString, nil)
}

func (s *APIServer) runKelpCommandStreaming(cmd string) error {
cmdString := fmt.Sprintf("%s %s", s.binPath, cmd)
return utils.RunCommandStreamOutput(exec.Command("bash", "-c", cmdString))
}

func (s *APIServer) runBashCommandBlocking(namespace string, cmd string) ([]byte, error) {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
c, e := s.runBashCommandBackground(namespace, cmd, writer)
if e != nil {
return nil, fmt.Errorf("could not run bash command in background '%s': %s", cmd, e)
}

e = c.Wait()
if e != nil {
return nil, fmt.Errorf("error waiting for bash command '%s': %s", cmd, e)
}

e = s.unregisterCommand(namespace)
if e != nil {
return nil, fmt.Errorf("could not run bash command '%s': %s", cmd, e)
return nil, fmt.Errorf("error unregistering bash command '%s': %s", cmd, e)
}
return bytes, nil

return buf.Bytes(), nil
}

func (s *APIServer) runBashCommandBackground(namespace string, cmd string, writer io.Writer) (*exec.Cmd, error) {
c := exec.Command("bash", "-c", cmd)
if writer != nil {
c.Stdout = writer
}

e := c.Start()
if e != nil {
return c, fmt.Errorf("could not start bash command '%s': %s", cmd, e)
}

e = s.registerCommand(namespace, c)
if e != nil {
return nil, fmt.Errorf("error registering bash command '%s': %s", cmd, e)
}

return c, nil
}
Loading

0 comments on commit e686d7f

Please sign in to comment.