Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shutdown in reverse dependency order #147

Merged
merged 1 commit into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions fixtures-code/process-compose-shutdown-inorder.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: "0.5"

log_level: debug
log_length: 1000

processes:
procA:
command: |
trap 'echo "A: exit"' SIGTERM
echo "A: starting"
sleep 3

procB:
command: |
trap 'echo "B: exit"' SIGTERM
echo "B: starting"
sleep 3
depends_on:
procA:
condition: process_started

procC:
command: |
trap 'echo "C: exit"' SIGTERM
echo "C: starting"
sleep 3
depends_on:
procB:
condition: process_started
18 changes: 12 additions & 6 deletions src/app/project_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package app
import "github.com/f1bonacc1/process-compose/src/types"

type ProjectOpts struct {
project *types.Project
processesToRun []string
noDeps bool
mainProcess string
mainProcessArgs []string
isTuiOn bool
project *types.Project
processesToRun []string
noDeps bool
mainProcess string
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
}

func (p *ProjectOpts) WithProject(project *types.Project) *ProjectOpts {
Expand Down Expand Up @@ -39,3 +40,8 @@ func (p *ProjectOpts) WithIsTuiOn(isTuiOn bool) *ProjectOpts {
p.isTuiOn = isTuiOn
return p
}

func (p *ProjectOpts) WithOrderedShutDown(isOrderedShutDown bool) *ProjectOpts {
p.isOrderedShutDown = isOrderedShutDown
return p
}
155 changes: 121 additions & 34 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,30 @@ import (
"os"
"os/user"
"runtime"
"slices"
"sync"
"time"

"github.com/rs/zerolog/log"
)

type ProjectRunner struct {
procConfMutex sync.Mutex
project *types.Project
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
projectState *types.ProjectState
mainProcess string
mainProcessArgs []string
isTuiOn bool
procConfMutex sync.Mutex
project *types.Project
logsMutex sync.Mutex
processLogs map[string]*pclog.ProcessLogBuffer
statesMutex sync.Mutex
processStates map[string]*types.ProcessState
runProcMutex sync.Mutex
runningProcesses map[string]*Process
logger pclog.PcLogger
waitGroup sync.WaitGroup
exitCode int
projectState *types.ProjectState
mainProcess string
mainProcessArgs []string
isTuiOn bool
isOrderedShutDown bool
}

func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) {
Expand Down Expand Up @@ -322,27 +324,111 @@ func (p *ProjectRunner) GetProcessPorts(name string) (*types.ProcessPorts, error
return ports, nil
}

func (p *ProjectRunner) runningProcessesReverseDependencies() map[string]map[string]*Process {
reverseDependencies := make(map[string]map[string]*Process)

// `p.runProcMutex` lock is assumed to have been acquired when calling
// this function. It is currently called by `ShutDownProject()`.
for _, process := range p.runningProcesses {
for k := range process.procConf.DependsOn {
if runningProc, ok := p.runningProcesses[k]; ok {
if _, ok := reverseDependencies[runningProc.getName()]; !ok {
dep := make(map[string]*Process)
dep[process.getName()] = process
reverseDependencies[runningProc.getName()] = dep
}
} else {
continue
}
}
}

return reverseDependencies
}

func (p *ProjectRunner) shutDownInOrder(wg *sync.WaitGroup, shutdownOrder []*Process) {
reverseDependencies := p.runningProcessesReverseDependencies()
for _, process := range shutdownOrder {
wg.Add(1)
go func(proc *Process) {
defer wg.Done()
waitForDepsWg := sync.WaitGroup{}
if revDeps, ok := reverseDependencies[proc.getName()]; ok {
for _, runningProc := range revDeps {
waitForDepsWg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
waitForDepsWg.Done()
}(runningProc)
}
}
waitForDepsWg.Wait()
log.Debug().Msgf("[%s]: waited for all dependencies to shut down", proc.getName())

err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
return
}
proc.waitForCompletion()
}(process)
}
}

func (p *ProjectRunner) shutDownAndWait(shutdownOrder []*Process) {
wg := sync.WaitGroup{}
if p.isOrderedShutDown {
p.shutDownInOrder(&wg, shutdownOrder)
} else {
for _, proc := range shutdownOrder {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
}

wg.Wait()
}

func (p *ProjectRunner) ShutDownProject() error {
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
runProc := p.runningProcesses
for _, proc := range runProc {
proc.prepareForShutDown()
}
wg := sync.WaitGroup{}
for _, proc := range runProc {
err := proc.shutDown()

shutdownOrder := []*Process{}
if p.isOrderedShutDown {
err := p.project.WithProcesses([]string{}, func(process types.ProcessConfig) error {
if runningProc, ok := p.runningProcesses[process.ReplicaName]; ok {
shutdownOrder = append(shutdownOrder, runningProc)
}
return nil
})
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
log.Error().Msgf("Failed to build project run order: %s", err.Error())
}
slices.Reverse(shutdownOrder)
} else {
for _, proc := range p.runningProcesses {
shutdownOrder = append(shutdownOrder, proc)
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
wg.Wait()

var nameOrder []string
for _, v := range shutdownOrder {
nameOrder = append(nameOrder, v.getName())
}
log.Debug().Msgf("Shutting down %d processes. Order: %q", len(shutdownOrder), nameOrder)
for _, proc := range shutdownOrder {
proc.prepareForShutDown()
}

p.shutDownAndWait(shutdownOrder)
return nil
}

Expand Down Expand Up @@ -644,10 +730,11 @@ func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) {
username = current.Username
}
runner := &ProjectRunner{
project: opts.project,
mainProcess: opts.mainProcess,
mainProcessArgs: opts.mainProcessArgs,
isTuiOn: opts.isTuiOn,
project: opts.project,
mainProcess: opts.mainProcess,
mainProcessArgs: opts.mainProcessArgs,
isTuiOn: opts.isTuiOn,
isOrderedShutDown: opts.isOrderedShutDown,
projectState: &types.ProjectState{
FileNames: opts.project.FileNames,
StartTime: time.Now(),
Expand Down
95 changes: 95 additions & 0 deletions src/app/system_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package app

import (
"bufio"
"github.com/f1bonacc1/process-compose/src/loader"
"github.com/f1bonacc1/process-compose/src/types"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -406,3 +410,94 @@ func TestSystem_TestProcListToRun(t *testing.T) {
}
})
}

func TestSystem_TestProcListShutsDownInOrder(t *testing.T) {
fixture1 := filepath.Join("..", "..", "fixtures-code", "process-compose-shutdown-inorder.yaml")
t.Run("Single Proc with deps", func(t *testing.T) {

project, err := loader.Load(&loader.LoaderOptions{
FileNames: []string{fixture1},
})
if err != nil {
t.Errorf(err.Error())
return
}
numProc := len(project.Processes)
runner, err := NewProjectRunner(&ProjectOpts{
project: project,
processesToRun: []string{},
mainProcessArgs: []string{},
isOrderedShutDown: true,
})
if err != nil {
t.Errorf(err.Error())
return
}
if len(runner.project.Processes) != numProc {
t.Errorf("should have %d processes", numProc)
}
for name, proc := range runner.project.Processes {
if proc.Disabled {
t.Errorf("process %s is disabled", name)
}
}
file, err := os.CreateTemp("/tmp", "pc_log.*.log")
defer os.Remove(file.Name())
project.LogLocation = file.Name()
project.LoggerConfig = &types.LoggerConfig{
FieldsOrder: []string{"message"},
DisableJSON: true,
TimestampFormat: "",
NoMetadata: true,
FlushEachLine: true,
NoColor: true,
}
go runner.Run()
time.Sleep(10 * time.Millisecond)
states, err := runner.GetProcessesState()
if err != nil {
t.Errorf(err.Error())
return
}
want := 3
if len(states.States) != want {
t.Errorf("len(states.States) = %d, want %d", len(states.States), want)
}

time.Sleep(10 * time.Millisecond)
err = runner.ShutDownProject()
if err != nil {
t.Errorf(err.Error())
return
}
states, err = runner.GetProcessesState()
if err != nil {
t.Errorf(err.Error())
return
}
runningProcesses := 0
for _, processState := range states.States {
if processState.IsRunning {
runningProcesses++
}
}
want = 0
if runningProcesses != want {
t.Errorf("runningProcesses = %d, want %d", runningProcesses, want)
}
//read file and validate the shutdown order
scanner := bufio.NewScanner(file)
order := make([]string, 0)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, "exit") {
order = append(order, line)
}
}
wantOrder := []string{"C: exit", "B: exit", "A: exit"}
if !slices.Equal(order, wantOrder) {
t.Errorf("content = %v, want %v", order, wantOrder)
return
}
})
}
1 change: 1 addition & 0 deletions src/cmd/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func getProjectRunner(process []string, noDeps bool, mainProcess string, mainPro
WithMainProcessArgs(mainProcessArgs).
WithProject(project).
WithProcessesToRun(process).
WithOrderedShutDown(*pcFlags.IsOrderedShutDown).
WithNoDeps(noDeps),
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions src/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func init() {
rootCmd.Flags().BoolVarP(pcFlags.Headless, "tui", "t", *pcFlags.Headless, "enable TUI (-t=false) (env: "+config.EnvVarNameTui+")")
rootCmd.PersistentFlags().BoolVar(pcFlags.KeepTuiOn, "keep-tui", *pcFlags.KeepTuiOn, "keep TUI running even after all processes exit")
rootCmd.PersistentFlags().BoolVar(pcFlags.NoServer, "no-server", *pcFlags.NoServer, "disable HTTP server (env: "+config.EnvVarNameNoServer+")")
rootCmd.PersistentFlags().BoolVar(pcFlags.IsOrderedShutDown, "ordered-shutdown", *pcFlags.IsOrderedShutDown, "shut down processes in reverse dependency order")
rootCmd.Flags().BoolVarP(pcFlags.HideDisabled, "hide-disabled", "d", *pcFlags.HideDisabled, "hide disabled processes")
rootCmd.Flags().IntVarP(pcFlags.RefreshRate, "ref-rate", "r", *pcFlags.RefreshRate, "TUI refresh rate in seconds")
rootCmd.PersistentFlags().IntVarP(pcFlags.PortNum, "port", "p", *pcFlags.PortNum, "port number (env: "+config.EnvVarNamePort+")")
Expand Down
Loading
Loading