-
Notifications
You must be signed in to change notification settings - Fork 5
/
dispatcher.go
174 lines (143 loc) · 4.52 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package raccoon
import (
"sync"
"fmt"
log "github.com/Sirupsen/logrus"
"time"
)
//Dispatcher is the interface to implement new dispatching strategies in the app
type Dispatcher interface {
Dispatch(js []Job)
}
//WorkerPoolDispatcher is a dispatcher that acts as a worker pool. It won't
//maintain more connections to hosts atthe same time than the specified number
//in the Workers field
type WorkerPoolDispatcher struct {
Workers int
sync.WaitGroup
}
type dispatchingJob struct {
job Job
host Host
}
//Dispatch will create the specified workers on the workers field, launching a
//goroutine with each. Then it will create a dispatcher routing in a different
//goroutine and finally it will send the jobs to the launched dispatcher for
//distribution.
func (w *WorkerPoolDispatcher) Dispatch(jobs []Job) {
log.WithFields(log.Fields{
"package": "dispatcher",
}).Info(fmt.Sprintf("%d workers pool selected", w.Workers))
freeWorkersPool := make(chan chan dispatchingJob, w.Workers)
dispatchingJobCh := make(chan dispatchingJob)
//Launch workers
for i := 0; i < w.Workers; i++ {
ch := make(chan dispatchingJob)
go w.worker(ch, freeWorkersPool)
freeWorkersPool <- ch
}
go w.dispatcher(dispatchingJobCh, freeWorkersPool)
for _, job := range jobs {
w.Add(len(job.Cluster.Hosts))
for _, host := range job.Cluster.Hosts {
dispatchingJobCh <- dispatchingJob{job, host}
}
}
w.Wait()
//Close channels
close(freeWorkersPool)
close(dispatchingJobCh)
}
func (w *WorkerPoolDispatcher) dispatcher(dispatchingJobCh chan dispatchingJob, freeWorkersPool chan chan dispatchingJob) {
for {
dispatchingJob := <-dispatchingJobCh
worker := <-freeWorkersPool
worker <- dispatchingJob
}
}
func (w *WorkerPoolDispatcher) worker(dispatchingJobCh chan dispatchingJob, freeWorkersPool chan chan dispatchingJob) {
for {
dispatchingJob := <-dispatchingJobCh
fmt.Printf("Receiving host %s on ssh port %d\n", dispatchingJob.host.IP, dispatchingJob.host.SSHPort)
dispatcher := SequentialDispatcher{}
dispatcher.executeJobOnHost(dispatchingJob.job, dispatchingJob.host)
w.Done()
freeWorkersPool <- dispatchingJobCh
}
}
//SimpleDispatcher is the dispatching strategy to use when the number of hosts
//isn't very high. It will open a new SSH connection to each host at the same
//time since the beginning so, if you are having performance issues, try the
//workers pool or the sequential dispatcher to see if they get solved.
//SimpleDispatcher is the default dispatching strategy
type SimpleDispatcher struct {
sync.WaitGroup
}
//Dispatch will create a new goroutine for each host in the job and launch the
//tasks that were specified on it.
func (s *SimpleDispatcher) Dispatch(jobs []Job) {
for _, job := range jobs {
for _, node := range job.Cluster.Hosts {
s.Add(1)
go s.executeJobOnHost(job, node)
}
}
s.Wait()
//FIXME for some reason, the dispatching exists before some tasks have finished
time.Sleep(3 * time.Second)
}
func (s *SimpleDispatcher) executeJobOnHost(j Job, h Host) {
err := h.InitializeNode()
if err != nil {
log.WithFields(log.Fields{
"host": h.IP,
"package": "dispatcher",
}).Error("Error initializing node: " + err.Error())
return
}
for _, instruction := range j.Task.Commands {
instruction.Execute(h)
}
err = h.CloseNode()
if err != nil {
log.WithFields(log.Fields{
"host": h.IP,
"package": "dispatcher",
}).Warn("Error closing session: " + err.Error())
}
s.Done()
}
//SequentialDispatcher is a dispatching strategy that doesn't use any concurrent
//approach. It will execute each command on each host sequentially and waits for
//each to finish before starting the next.
type SequentialDispatcher struct{}
//Dispatch ranges over the hosts to execute the tasks on them sequentially without any kind of concurrency.
func (s *SequentialDispatcher) Dispatch(jobs []Job) {
for _, job := range jobs {
for _, node := range job.Cluster.Hosts {
s.executeJobOnHost(job, node)
}
}
//FIXME for some reason, the dispatching exists before some tasks have finished
time.Sleep(3 * time.Second)
}
func (s *SequentialDispatcher) executeJobOnHost(j Job, h Host) {
err := h.InitializeNode()
if err != nil {
log.WithFields(log.Fields{
"host": h.IP,
"package": "dispatcher",
}).Error("Error initializing node: " + err.Error())
return
}
for _, instruction := range j.Task.Commands {
instruction.Execute(h)
}
err = h.CloseNode()
if err != nil {
log.WithFields(log.Fields{
"host": h.IP,
"package": "dispatcher",
}).Warn("Error closing session: " + err.Error())
}
}