Skip to content

Commit

Permalink
fix(tests): remove race condition in service manager. Fixes #89
Browse files Browse the repository at this point in the history
  • Loading branch information
mefellows committed Jul 14, 2018
1 parent 3ed92b9 commit 6883cf4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 24 deletions.
37 changes: 31 additions & 6 deletions client/service_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"log"
"os"
"os/exec"
"sync"
"time"
)

// ServiceManager is the default implementation of the Service interface.
type ServiceManager struct {
Cmd string
processes map[int]*exec.Cmd
processMap processMap
Args []string
Env []string
commandCompleteChan chan *exec.Cmd
Expand All @@ -24,7 +25,7 @@ func (s *ServiceManager) Setup() {

s.commandCreatedChan = make(chan *exec.Cmd)
s.commandCompleteChan = make(chan *exec.Cmd)
s.processes = make(map[int]*exec.Cmd)
s.processMap = processMap{processes: make(map[int]*exec.Cmd)}

// Listen for service create/kill
go s.addServiceMonitor()
Expand All @@ -38,7 +39,7 @@ func (s *ServiceManager) addServiceMonitor() {
select {
case p := <-s.commandCreatedChan:
if p != nil && p.Process != nil {
s.processes[p.Process.Pid] = p
s.processMap.Set(p.Process.Pid, p)
}
}
}
Expand All @@ -53,7 +54,7 @@ func (s *ServiceManager) removeServiceMonitor() {
case p = <-s.commandCompleteChan:
if p != nil && p.Process != nil {
p.Process.Signal(os.Interrupt)
delete(s.processes, p.Process.Pid)
s.processMap.Delete(p.Process.Pid)
}
}
}
Expand All @@ -62,7 +63,7 @@ func (s *ServiceManager) removeServiceMonitor() {
// Stop a Service and returns the exit status.
func (s *ServiceManager) Stop(pid int) (bool, error) {
log.Println("[DEBUG] stopping service with pid", pid)
cmd := s.processes[pid]
cmd := s.processMap.Get(pid)

// Remove service from registry
go func() {
Expand Down Expand Up @@ -96,7 +97,7 @@ func (s *ServiceManager) Stop(pid int) (bool, error) {
// List all Service PIDs.
func (s *ServiceManager) List() map[int]*exec.Cmd {
log.Println("[DEBUG] listing services")
return s.processes
return s.processMap.processes
}

// Command executes the command
Expand Down Expand Up @@ -151,3 +152,27 @@ func (s *ServiceManager) Start() *exec.Cmd {

return cmd
}

type processMap struct {
sync.RWMutex
processes map[int]*exec.Cmd
}

func (pm *processMap) Get(k int) *exec.Cmd {
pm.RLock()
defer pm.RUnlock()
v, _ := pm.processes[k]
return v
}

func (pm *processMap) Set(k int, v *exec.Cmd) {
pm.Lock()
defer pm.Unlock()
pm.processes[k] = v
}

func (pm *processMap) Delete(k int) {
pm.Lock()
defer pm.Unlock()
delete(pm.processes, k)
}
56 changes: 38 additions & 18 deletions client/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,25 @@ func TestServiceManager_removeServiceMonitor(t *testing.T) {
mgr := createServiceManager()
cmd := fakeExecCommand("", true, "")
cmd.Start()
mgr.processes = map[int]*exec.Cmd{
mgr.processMap.processes = map[int]*exec.Cmd{
cmd.Process.Pid: cmd,
}

mgr.commandCompleteChan <- cmd
var timeout = time.After(channelTimeout)
for {
mgr.processMap.Lock()
defer mgr.processMap.Unlock()

select {
case <-time.After(10 * time.Millisecond):
if len(mgr.processes) == 0 {
if len(mgr.processMap.processes) == 0 {
return
}
case <-timeout:
if len(mgr.processes) != 0 {
if len(mgr.processMap.processes) != 0 {
t.Fatalf(`Expected 1 command to be removed from the queue. Have %d
Timed out after 500millis`, len(mgr.processes))
Timed out after 500millis`, len(mgr.processMap.processes))
}
}
}
Expand All @@ -77,15 +80,20 @@ func TestServiceManager_addServiceMonitor(t *testing.T) {
var timeout = time.After(channelTimeout)

for {

select {
case <-time.After(10 * time.Millisecond):
if len(mgr.processes) == 1 {
mgr.processMap.Lock()
defer mgr.processMap.Unlock()
if len(mgr.processMap.processes) == 1 {
return
}
case <-timeout:
if len(mgr.processes) != 1 {
mgr.processMap.Lock()
defer mgr.processMap.Unlock()
if len(mgr.processMap.processes) != 1 {
t.Fatalf(`Expected 1 command to be added to the queue, but got: %d.
Timed out after 500millis`, len(mgr.processes))
Timed out after 500millis`, len(mgr.processMap.processes))
}
return
}
Expand All @@ -99,16 +107,18 @@ func TestServiceManager_addServiceMonitorWithDeadJob(t *testing.T) {
var timeout = time.After(channelTimeout)

for {

select {
case <-time.After(10 * time.Millisecond):
if len(mgr.processes) != 0 {

if len(mgr.processMap.processes) != 0 {
t.Fatalf(`Expected 0 command to be added to the queue, but got: %d.
Timed out after 5 attempts`, len(mgr.processes))
Timed out after 5 attempts`, len(mgr.processMap.processes))
}
case <-timeout:
if len(mgr.processes) != 0 {
if len(mgr.processMap.processes) != 0 {
t.Fatalf(`Expected 0 command to be added to the queue, but got: %d.
Timed out after 50millis`, len(mgr.processes))
Timed out after 50millis`, len(mgr.processMap.processes))
}
return
}
Expand All @@ -119,20 +129,23 @@ func TestServiceManager_Stop(t *testing.T) {
mgr := createServiceManager()
cmd := fakeExecCommand("", true, "")
cmd.Start()
mgr.processes = map[int]*exec.Cmd{
mgr.processMap.processes = map[int]*exec.Cmd{
cmd.Process.Pid: cmd,
}

mgr.Stop(cmd.Process.Pid)
var timeout = time.After(channelTimeout)
for {
mgr.processMap.Lock()
defer mgr.processMap.Unlock()

select {
case <-time.After(10 * time.Millisecond):
if len(mgr.processes) == 0 {
if len(mgr.processMap.processes) == 0 {
return
}
case <-timeout:
if len(mgr.processes) != 0 {
if len(mgr.processMap.processes) != 0 {
t.Fatalf(`Expected 1 command to be removed from the queue.
Timed out after 500millis`)
}
Expand All @@ -148,7 +161,9 @@ func TestServiceManager_List(t *testing.T) {
processes := map[int]*exec.Cmd{
cmd.Process.Pid: cmd,
}
mgr.processes = processes
mgr.processMap.Lock()
mgr.processMap.processes = processes
mgr.processMap.Unlock()

if !reflect.DeepEqual(processes, mgr.List()) {
t.Fatalf("Expected mgr.List() to equal processes")
Expand All @@ -161,15 +176,20 @@ func TestServiceManager_Start(t *testing.T) {
var timeout = time.After(channelTimeout)

for {

select {
case <-time.After(10 * time.Millisecond):
if len(mgr.processes) == 1 {
mgr.processMap.Lock()
if len(mgr.processMap.processes) == 1 {
mgr.processMap.Unlock()
return
}
case <-timeout:
if len(mgr.processes) != 1 {
mgr.processMap.Lock()
defer mgr.processMap.Unlock()
if len(mgr.processMap.processes) != 1 {
t.Fatalf(`Expected 1 command to be added to the queue, but got: %d.
Timed out after 500millis`, len(mgr.processes))
Timed out after 500millis`, len(mgr.processMap.processes))
}
return
}
Expand Down

0 comments on commit 6883cf4

Please sign in to comment.