Skip to content

Commit

Permalink
feat: shutdown in reverse dependency order
Browse files Browse the repository at this point in the history
address code review

fix: unused variable

refactor: smaller functions

test: shutdown in order

Add shutdown order test

fix: tests
  • Loading branch information
anmonteiro committed Mar 2, 2024
1 parent 82b51ba commit dc044a5
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 72 deletions.
29 changes: 29 additions & 0 deletions fixtures-code/process-compose-shutdown-inorder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: "0.5"

log_level: debug
log_length: 1000

processes:
procA:
command: |
trap 'echo "A: exit"' SIGTERM
echo "A: starting"
sleep 3
procB:
command: |
trap 'echo "B: exit"' SIGTERM
echo "B: starting"
sleep 3
depends_on:
procA:
condition: process_started

procC:
command: |
trap 'echo "C: exit"' SIGTERM
echo "C: starting"
sleep 3
depends_on:
procB:
condition: process_started
18 changes: 12 additions & 6 deletions src/app/project_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package app
import "github.com/f1bonacc1/process-compose/src/types"

type ProjectOpts struct {
project *types.Project
processesToRun []string
noDeps bool
mainProcess string
mainProcessArgs []string
isTuiOn bool
project *types.Project
processesToRun []string
noDeps bool
mainProcess string
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
}

func (p *ProjectOpts) WithProject(project *types.Project) *ProjectOpts {
Expand Down Expand Up @@ -39,3 +40,8 @@ func (p *ProjectOpts) WithIsTuiOn(isTuiOn bool) *ProjectOpts {
p.isTuiOn = isTuiOn
return p
}

func (p *ProjectOpts) WithOrderedShutDown(isOrderedShutDown bool) *ProjectOpts {
p.isOrderedShutDown = isOrderedShutDown
return p
}
155 changes: 121 additions & 34 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,30 @@ import (
"os"
"os/user"
"runtime"
"slices"
"sync"
"time"

"github.com/rs/zerolog/log"
)

type ProjectRunner struct {
procConfMutex sync.Mutex
project *types.Project
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
projectState *types.ProjectState
mainProcess string
mainProcessArgs []string
isTuiOn bool
procConfMutex sync.Mutex
project *types.Project
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
projectState *types.ProjectState
mainProcess string
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
}

func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) {
Expand Down Expand Up @@ -322,27 +324,111 @@ func (p *ProjectRunner) GetProcessPorts(name string) (*types.ProcessPorts, error
return ports, nil
}

func (p *ProjectRunner) runningProcessesReverseDependencies() map[string]map[string]*Process {
reverseDependencies := make(map[string]map[string]*Process)

// `p.runProcMutex` lock is assumed to have been acquired when calling
// this function. It is currently called by `ShutDownProject()`.
for _, process := range p.runningProcesses {
for k := range process.procConf.DependsOn {
if runningProc, ok := p.runningProcesses[k]; ok {
if _, ok := reverseDependencies[runningProc.getName()]; !ok {
dep := make(map[string]*Process)
dep[process.getName()] = process
reverseDependencies[runningProc.getName()] = dep
}
} else {
continue
}
}
}

return reverseDependencies
}

func (p *ProjectRunner) shutDownInOrder(wg *sync.WaitGroup, shutdownOrder []*Process) {
reverseDependencies := p.runningProcessesReverseDependencies()
for _, process := range shutdownOrder {
wg.Add(1)
go func(proc *Process) {
defer wg.Done()
waitForDepsWg := sync.WaitGroup{}
if revDeps, ok := reverseDependencies[proc.getName()]; ok {
for _, runningProc := range revDeps {
waitForDepsWg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
waitForDepsWg.Done()
}(runningProc)
}
}
waitForDepsWg.Wait()
log.Debug().Msgf("[%s]: waited for all dependencies to shut down", proc.getName())

err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
return
}
proc.waitForCompletion()
}(process)
}
}

func (p *ProjectRunner) shutDownAndWait(shutdownOrder []*Process) {
wg := sync.WaitGroup{}
if p.isOrderedShutDown {
p.shutDownInOrder(&wg, shutdownOrder)
} else {
for _, proc := range shutdownOrder {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
}

wg.Wait()
}

func (p *ProjectRunner) ShutDownProject() error {
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
runProc := p.runningProcesses
for _, proc := range runProc {
proc.prepareForShutDown()
}
wg := sync.WaitGroup{}
for _, proc := range runProc {
err := proc.shutDown()

shutdownOrder := []*Process{}
if p.isOrderedShutDown {
err := p.project.WithProcesses([]string{}, func(process types.ProcessConfig) error {
if runningProc, ok := p.runningProcesses[process.ReplicaName]; ok {
shutdownOrder = append(shutdownOrder, runningProc)
}
return nil
})
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
log.Error().Msgf("Failed to build project run order: %s", err.Error())
}
slices.Reverse(shutdownOrder)
} else {
for _, proc := range p.runningProcesses {
shutdownOrder = append(shutdownOrder, proc)
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
wg.Wait()

var nameOrder []string
for _, v := range shutdownOrder {
nameOrder = append(nameOrder, v.getName())
}
log.Debug().Msgf("Shutting down %d processes. Order: %q", len(shutdownOrder), nameOrder)
for _, proc := range shutdownOrder {
proc.prepareForShutDown()
}

p.shutDownAndWait(shutdownOrder)
return nil
}

Expand Down Expand Up @@ -644,10 +730,11 @@ func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) {
username = current.Username
}
runner := &ProjectRunner{
project: opts.project,
mainProcess: opts.mainProcess,
mainProcessArgs: opts.mainProcessArgs,
isTuiOn: opts.isTuiOn,
project: opts.project,
mainProcess: opts.mainProcess,
mainProcessArgs: opts.mainProcessArgs,
isTuiOn: opts.isTuiOn,
isOrderedShutDown: opts.isOrderedShutDown,
projectState: &types.ProjectState{
FileNames: opts.project.FileNames,
StartTime: time.Now(),
Expand Down
95 changes: 95 additions & 0 deletions src/app/system_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package app

import (
"bufio"
"github.com/f1bonacc1/process-compose/src/loader"
"github.com/f1bonacc1/process-compose/src/types"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -406,3 +410,94 @@ func TestSystem_TestProcListToRun(t *testing.T) {
}
})
}

func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
fixture1 := filepath.Join("..", "..", "fixtures-code", "process-compose-shutdown-inorder.yaml")
t.Run("Single Proc with deps", func(t *testing.T) {

project, err := loader.Load(&loader.LoaderOptions{
FileNames: []string{fixture1},
})
if err != nil {
t.Errorf(err.Error())
return
}
numProc := len(project.Processes)
runner, err := NewProjectRunner(&ProjectOpts{
project: project,
processesToRun: []string{},
mainProcessArgs: []string{},
isOrderedShutDown: true,
})
if err != nil {
t.Errorf(err.Error())
return
}
if len(runner.project.Processes) != numProc {
t.Errorf("should have %d processes", numProc)
}
for name, proc := range runner.project.Processes {
if proc.Disabled {
t.Errorf("process %s is disabled", name)
}
}
file, err := os.CreateTemp("/tmp", "pc_log.*.log")
defer os.Remove(file.Name())
project.LogLocation = file.Name()
project.LoggerConfig = &types.LoggerConfig{
FieldsOrder: []string{"message"},
DisableJSON: true,
TimestampFormat: "",
NoMetadata: true,
FlushEachLine: true,
NoColor: true,
}
go runner.Run()
time.Sleep(10 * time.Millisecond)
states, err := runner.GetProcessesState()
if err != nil {
t.Errorf(err.Error())
return
}
want := 3
if len(states.States) != want {
t.Errorf("len(states.States) = %d, want %d", len(states.States), want)
}

time.Sleep(10 * time.Millisecond)
err = runner.ShutDownProject()
if err != nil {
t.Errorf(err.Error())
return
}
states, err = runner.GetProcessesState()
if err != nil {
t.Errorf(err.Error())
return
}
runningProcesses := 0
for _, processState := range states.States {
if processState.IsRunning {
runningProcesses++
}
}
want = 0
if runningProcesses != want {
t.Errorf("runningProcesses = %d, want %d", runningProcesses, want)
}
//read file and validate the shutdown order
scanner := bufio.NewScanner(file)
order := make([]string, 0)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "exit") {
order = append(order, line)
}
}
wantOrder := []string{"C: exit", "B: exit", "A: exit"}
if !slices.Equal(order, wantOrder) {
t.Errorf("content = %v, want %v", order, wantOrder)
return
}
})
}
1 change: 1 addition & 0 deletions src/cmd/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func getProjectRunner(process []string, noDeps bool, mainProcess string, mainPro
WithMainProcessArgs(mainProcessArgs).
WithProject(project).
WithProcessesToRun(process).
WithOrderedShutDown(*pcFlags.IsOrderedShutDown).
WithNoDeps(noDeps),
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions src/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func init() {
rootCmd.Flags().BoolVarP(pcFlags.Headless, "tui", "t", *pcFlags.Headless, "enable TUI (-t=false) (env: "+config.EnvVarNameTui+")")
rootCmd.PersistentFlags().BoolVar(pcFlags.KeepTuiOn, "keep-tui", *pcFlags.KeepTuiOn, "keep TUI running even after all processes exit")
rootCmd.PersistentFlags().BoolVar(pcFlags.NoServer, "no-server", *pcFlags.NoServer, "disable HTTP server (env: "+config.EnvVarNameNoServer+")")
rootCmd.PersistentFlags().BoolVar(pcFlags.IsOrderedShutDown, "ordered-shutdown", *pcFlags.IsOrderedShutDown, "shut down processes in reverse dependency order")
rootCmd.Flags().BoolVarP(pcFlags.HideDisabled, "hide-disabled", "d", *pcFlags.HideDisabled, "hide disabled processes")
rootCmd.Flags().IntVarP(pcFlags.RefreshRate, "ref-rate", "r", *pcFlags.RefreshRate, "TUI refresh rate in seconds")
rootCmd.PersistentFlags().IntVarP(pcFlags.PortNum, "port", "p", *pcFlags.PortNum, "port number (env: "+config.EnvVarNamePort+")")
Expand Down
Loading

0 comments on commit dc044a5

Please sign in to comment.