-
Notifications
You must be signed in to change notification settings - Fork 6
/
sender.go
93 lines (86 loc) · 1.84 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package cat
import (
"bytes"
"github.com/ctripcorp/ghost/pool"
"time"
)
var (
sender_transaction_channel chan Message
sender_max_batch_size int
sender_pool pool.Pool
)
//cat_sender_init is internally used and only called by Cat_init_if.
func cat_sender_init() {
sender_transaction_channel = make(chan Message, 1<<10)
sender_max_batch_size = 1 << 8
sender_pool, _ = pool.NewBlockingPool(1, 1, 90 * time.Second, CONN_FACTORY)
go sender_run()
}
//sender_run call sender_collect repeatedly.
//Basically, only 1 goroutine keeps this function.
func sender_run() {
for {
if sender_collect() {
time.Sleep(1 << 16 * time.Microsecond)
}
}
}
//False returned when it seems to be busy.
func sender_collect() bool {
messages := make(chan Message, sender_max_batch_size)
var count = 0
collect:
for count < sender_max_batch_size {
select {
case message := <-sender_transaction_channel:
messages <- message
count++
default:
break collect
}
}
close(messages)
if count == sender_max_batch_size {
sender_encode(messages, count)
return false
} else if count > 0 {
sender_encode(messages, count)
return true
} else {
return true
}
}
func sender_encode(messages <-chan Message, count int) {
datas := make(chan []byte, count)
var err error
for message := range messages {
buf := bytes.NewBuffer([]byte{0, 0, 0, 0})
err = NewHeader().Encode(buf)
if err != nil {
continue
}
message.Encode(buf)
load := int32tobytes(int32(buf.Len() - 4))
data := buf.Bytes()
data[0] = load[0]
data[1] = load[1]
data[2] = load[2]
data[3] = load[3]
datas <- data
}
close(datas)
go sender_send(datas)
}
func sender_send(datas <-chan []byte) {
conn, err := sender_pool.Get()
if err != nil {
return
}
defer conn.Close()
for data := range datas {
_, err = conn.Write(data)
if err != nil {
return
}
}
}