forked from stephenjlovell/load_balancer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
balancer.go
105 lines (93 loc) · 2.41 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"container/heap"
"fmt"
)
type Balancer struct {
pool Pool
name string
done chan *Worker
work chan Request
}
type BalancerOpts struct {
Name string
NumWorkers int
IsPipeline bool
NextPipeline chan Request
WorkChannel chan Request
Fn func(data *interface{}) interface{}
}
func (b *Balancer) start() {
for _, worker := range b.pool {
worker.work(b.done)
}
}
func (b *Balancer) balance() {
go func() {
for {
select {
case req := <-b.work: // request received
b.dispatch(req) // forward request to a worker
case w := <-b.done: // worker finished with a request
b.completed(w)
}
}
}()
fmt.Println("Staretd the load balancer")
}
func (b *Balancer) dispatch(req Request) { // route the request to the most lightly loaded
w := heap.Pop(&b.pool).(*Worker) // worker in the priority queue, and adjust queue
w.requests <- req // ordering if needed.
w.pending++
heap.Push(&b.pool, w)
}
func (b *Balancer) completed(w *Worker) { // adjust the ordering of the priority queue.
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
}
func (b *Balancer) print() {
fmt.Printf("\n %s ", b.name)
total_pending := 0
for _, worker := range b.pool {
pending := worker.pending
fmt.Printf("%d ", pending)
total_pending += pending
}
fmt.Printf("| %d ", total_pending)
}
func new_balancer(nworker int, work chan Request) *Balancer { // Balancer constructor
b := &Balancer{
done: make(chan *Worker, 50000),
pool: make(Pool, nworker),
}
for i := 0; i < nworker; i++ {
b.pool[i] = &Worker{
requests: make(chan Request, 100), // each worker needs its own channel on which to receive work
index: i, // from the load balancer.
}
}
heap.Init(&b.pool)
return b
}
// MakeBalancer create a load balancer with the provided options
func MakeBalancer(opts *BalancerOpts) *Balancer {
b := &Balancer{
done: make(chan *Worker, 50000),
pool: make(Pool, opts.NumWorkers),
work: opts.WorkChannel,
name: opts.Name,
}
for i := 0; i < opts.NumWorkers; i++ {
b.pool[i] = &Worker{
requests: make(chan Request, 50000), // each worker needs its own channel on which to receive work
index: i, // from the load balancer.
isPipelined: opts.IsPipeline,
forwardRequest: opts.NextPipeline,
fn: opts.Fn,
}
}
heap.Init(&b.pool)
b.start()
return b
}