Skip to content

Commit

Permalink
refactor: smaller functions
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed Mar 2, 2024
1 parent 6191961 commit 8a6e33b
Showing 1 changed file with 52 additions and 45 deletions.
97 changes: 52 additions & 45 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,57 @@ func (p *ProjectRunner) runningProcessesReverseDependencies() map[string]map[str
return reverseDependencies
}

func (p *ProjectRunner) shutDownInOrder(wg *sync.WaitGroup, shutdownOrder []*Process) {
reverseDependencies := p.runningProcessesReverseDependencies()
for _, process := range shutdownOrder {
wg.Add(1)
go func(proc *Process) {
defer wg.Done()
waitForDepsWg := sync.WaitGroup{}
if revDeps, ok := reverseDependencies[proc.getName()]; ok {
for _, runningProc := range revDeps {
waitForDepsWg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
waitForDepsWg.Done()
}(runningProc)
}
}
waitForDepsWg.Wait()
log.Debug().Msgf("[%s]: waited for all dependencies to shut down", proc.getName())

err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
return
}
proc.waitForCompletion()
}(process)
}
}

func (p *ProjectRunner) shutDownAndWait(shutdownOrder []*Process) {
wg := sync.WaitGroup{}
if p.isOrderedShutDown {
p.shutDownInOrder(&wg, shutdownOrder)
} else {
for _, proc := range shutdownOrder {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}
}

wg.Wait()
}

func (p *ProjectRunner) ShutDownProject() error {
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
Expand Down Expand Up @@ -377,51 +428,7 @@ func (p *ProjectRunner) ShutDownProject() error {
proc.prepareForShutDown()
}

wg := sync.WaitGroup{}
if p.isOrderedShutDown {
reverseDependencies := p.runningProcessesReverseDependencies()
for _, process := range shutdownOrder {
wg.Add(1)
go func(proc *Process) {
defer wg.Done()
waitForDepsWg := sync.WaitGroup{}
if revDeps, ok := reverseDependencies[proc.getName()]; ok {
for _, runningProc := range revDeps {
waitForDepsWg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
waitForDepsWg.Done()
}(runningProc)
}
}
waitForDepsWg.Wait()
log.Debug().Msgf("[%s]: waited for all dependencies to shut down", proc.getName())

err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
return
}
proc.waitForCompletion()
}(process)
}
} else {
for _, proc := range shutdownOrder {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)
}

}

wg.Wait()
p.shutDownAndWait(shutdownOrder)
return nil
}

Expand Down

0 comments on commit 8a6e33b

Please sign in to comment.