-
Notifications
You must be signed in to change notification settings - Fork 1
/
handler.go
103 lines (89 loc) · 2.73 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package websocket
import (
"net/http"
"github.com/gobwas/ws"
"github.com/mailru/easygo/netpoll"
)
// Handler is a high performance websocket handler that uses netpoll with
// read and write concurrency to allow a high number of concurrent websocket
// connections.
type Handler struct {
callback RecievedCallback
poller netpoll.Poller
readPool *pool
writePool *pool
}
// OpCode represents an operation code.
type OpCode ws.OpCode
// Operation codes that will be used in the RecievedCallback. rfc6455 defines
// more operation codes but those are hidden by the implementation.
const (
OpText OpCode = 0x1
OpBinary OpCode = 0x2
)
// RecievedCallback is the signature for the callback called when a message
// is recieved on a Channel.
type RecievedCallback func(c *Channel, op OpCode, data []byte)
// NewHandler creates a new websocket handler.
func NewHandler(callback RecievedCallback, readPoolConcurrency int, writePoolConcurrency int) (*Handler, error) {
// Creates the poller that is used when a websocket connects. This is used
// to prevent the spawning of a 2 goroutines per connection.
poller, err := netpoll.New(nil)
if err != nil {
return nil, err
}
// Create the handle object.
return &Handler{
callback: callback,
poller: poller,
readPool: newPool(readPoolConcurrency),
writePool: newPool(writePoolConcurrency),
}, nil
}
// CreateChannel upgrades the incoming http request into a websocket channel.
func (h *Handler) CreateChannel(w http.ResponseWriter, r *http.Request) (*Channel, error) {
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
// Ignore the error, the UpdateHTTP handled notifying the client.
return nil, err
}
// Open the channel with the client.
ch := newChannel(conn, h)
h.startRead(ch)
return ch, nil
}
// UpgradeHandler upgrades the incoming http request to become a websocket
// connection.
func (h *Handler) UpgradeHandler(w http.ResponseWriter, r *http.Request) {
h.CreateChannel(w, r)
}
// Start the reading of the connection from epoll.
func (h *Handler) startRead(c *Channel) {
err := h.poller.Start(c.readDesc, func(ev netpoll.Event) {
// Verify the connection is not closed.
if ev&netpoll.EventReadHup != 0 {
// Connection closed.
c.Close()
return
}
// Trigger the read. This will block once read concurrency is hit.
h.readPool.schedule(func() {
// Actually read from the channel.
c.read()
// Resume to get the next message.
err := h.poller.Resume(c.readDesc)
if err != nil {
// Failed to resume reading, close the connection.
c.Close()
}
})
})
if err == netpoll.ErrRegistered {
// Already being handled.
} else if err == netpoll.ErrClosed {
// Already closed.
} else if err != nil {
// Failed to start reading.
c.Close()
}
}