Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Kelp ui autogenerate bot config and process mgmt #177

Merged
merged 14 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 3 additions & 24 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"bufio"
"fmt"
"log"
"net/http"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/spf13/cobra"
"github.com/stellar/kelp/gui"
"github.com/stellar/kelp/gui/backend"
"github.com/stellar/kelp/support/utils"
)

var serverCmd = &cobra.Command{
Expand Down Expand Up @@ -112,7 +112,7 @@ func runWithYarn(options serverInputs) {
os.Setenv("PORT", fmt.Sprintf("%d", *options.port))

log.Printf("Serving frontend via yarn on HTTP port: %d\n", *options.port)
e := runCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "start"))
e := utils.RunCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "start"))
if e != nil {
panic(e)
}
Expand All @@ -121,32 +121,11 @@ func runWithYarn(options serverInputs) {
func generateStaticFiles() {
log.Printf("generating contents of gui/web/build ...\n")

e := runCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "build"))
e := utils.RunCommandStreamOutput(exec.Command("yarn", "--cwd", "gui/web", "build"))
if e != nil {
panic(e)
}

log.Printf("... finished generating contents of gui/web/build\n")
log.Println()
}

func runCommandStreamOutput(command *exec.Cmd) error {
stdout, e := command.StdoutPipe()
if e != nil {
return fmt.Errorf("error while creating Stdout pipe: %s", e)
}
command.Start()

scanner := bufio.NewScanner(stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
log.Printf("\t%s\n", line)
}

e = command.Wait()
if e != nil {
return fmt.Errorf("could not execute command: %s", e)
}
return nil
}
9 changes: 1 addition & 8 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,6 @@ func makeExchangeShimSdex(
var e error
var exchangeShim api.ExchangeShim
if !botConfig.IsTradingSdex() {
exchangeAPIKeys := []api.ExchangeAPIKey{}
for _, apiKey := range botConfig.ExchangeAPIKeys {
exchangeAPIKeys = append(exchangeAPIKeys, api.ExchangeAPIKey{
Key: apiKey.Key,
Secret: apiKey.Secret,
})
}

exchangeParams := []api.ExchangeParam{}
for _, param := range botConfig.ExchangeParams {
exchangeParams = append(exchangeParams, api.ExchangeParam{
Expand All @@ -222,6 +214,7 @@ func makeExchangeShimSdex(
})
}

exchangeAPIKeys := botConfig.ExchangeAPIKeys.ToExchangeAPIKeys()
var exchangeAPI api.Exchange
exchangeAPI, e = plugins.MakeTradingExchange(botConfig.TradingExchange, exchangeAPIKeys, exchangeParams, exchangeHeaders, *options.simMode)
if e != nil {
Expand Down
129 changes: 123 additions & 6 deletions gui/backend/api_server.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package backend

import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"sync"

"github.com/stellar/kelp/support/utils"
)

// APIServer is an instance of the API service
type APIServer struct {
binPath string
dirPath string
binPath string
configsDir string
logsDir string
processes map[string]*exec.Cmd
processLock *sync.Mutex
}

// MakeAPIServer is a factory method
Expand All @@ -19,16 +31,121 @@ func MakeAPIServer() (*APIServer, error) {
return nil, fmt.Errorf("could not get binPath of currently running binary: %s", e)
}

dirPath := filepath.Dir(binPath)
configsDir := dirPath + "/ops/configs"
logsDir := dirPath + "/ops/logs"

return &APIServer{
binPath: binPath,
dirPath: dirPath,
binPath: binPath,
configsDir: configsDir,
logsDir: logsDir,
processes: map[string]*exec.Cmd{},
processLock: &sync.Mutex{},
}, nil
}

func (s *APIServer) runCommand(cmd string) ([]byte, error) {
func (s *APIServer) registerCommand(namespace string, c *exec.Cmd) error {
s.processLock.Lock()
defer s.processLock.Unlock()

if _, exists := s.processes[namespace]; exists {
return fmt.Errorf("process with namespace already exists: %s", namespace)
}

s.processes[namespace] = c
log.Printf("registered command under namespace '%s' with PID: %d", namespace, c.Process.Pid)
return nil
}

func (s *APIServer) unregisterCommand(namespace string) error {
s.processLock.Lock()
defer s.processLock.Unlock()

if c, exists := s.processes[namespace]; exists {
delete(s.processes, namespace)
log.Printf("unregistered command under namespace '%s' with PID: %d", namespace, c.Process.Pid)
return nil
}
return fmt.Errorf("process with namespace does not exist: %s", namespace)
}

func (s *APIServer) getCommand(namespace string) (*exec.Cmd, bool) {
s.processLock.Lock()
defer s.processLock.Unlock()

c, exists := s.processes[namespace]
return c, exists
}

func (s *APIServer) safeUnregisterCommand(namespace string) {
s.unregisterCommand(namespace)
}

func (s *APIServer) stopCommand(namespace string) error {
if c, exists := s.getCommand(namespace); exists {
e := s.unregisterCommand(namespace)
if e != nil {
return fmt.Errorf("could not stop command because of an error when unregistering command for namespace '%s': %s", namespace, e)
}

log.Printf("killing process %d\n", c.Process.Pid)
return c.Process.Kill()
}
return fmt.Errorf("process with namespace does not exist: %s", namespace)
}

func (s *APIServer) runKelpCommandBlocking(namespace string, cmd string) ([]byte, error) {
cmdString := fmt.Sprintf("%s %s", s.binPath, cmd)
return s.runBashCommandBlocking(namespace, cmdString)
}

func (s *APIServer) runKelpCommandBackground(namespace string, cmd string) (*exec.Cmd, error) {
cmdString := fmt.Sprintf("%s %s", s.binPath, cmd)
bytes, e := exec.Command("bash", "-c", cmdString).Output()
return s.runBashCommandBackground(namespace, cmdString, nil)
}

func (s *APIServer) runKelpCommandStreaming(cmd string) error {
cmdString := fmt.Sprintf("%s %s", s.binPath, cmd)
return utils.RunCommandStreamOutput(exec.Command("bash", "-c", cmdString))
}

func (s *APIServer) runBashCommandBlocking(namespace string, cmd string) ([]byte, error) {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
c, e := s.runBashCommandBackground(namespace, cmd, writer)
if e != nil {
return nil, fmt.Errorf("could not run bash command in background '%s': %s", cmd, e)
}

e = c.Wait()
if e != nil {
return nil, fmt.Errorf("error waiting for bash command '%s': %s", cmd, e)
}

e = s.unregisterCommand(namespace)
if e != nil {
return nil, fmt.Errorf("could not run bash command '%s': %s", cmd, e)
return nil, fmt.Errorf("error unregistering bash command '%s': %s", cmd, e)
}
return bytes, nil

return buf.Bytes(), nil
}

func (s *APIServer) runBashCommandBackground(namespace string, cmd string, writer io.Writer) (*exec.Cmd, error) {
c := exec.Command("bash", "-c", cmd)
if writer != nil {
c.Stdout = writer
}

e := c.Start()
if e != nil {
return c, fmt.Errorf("could not start bash command '%s': %s", cmd, e)
}

e = s.registerCommand(namespace, c)
if e != nil {
return nil, fmt.Errorf("error registering bash command '%s': %s", cmd, e)
}

return c, nil
}
Loading