This repository has been archived by the owner on May 1, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathserver.go
123 lines (106 loc) · 2.95 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package webwire
import (
"fmt"
"log"
"net/url"
"sync"
"github.com/qbeon/webwire-go/message"
)
// server represents a headless WebWire server instance,
// where headless means there's no HTTP server that's hosting it
type server struct {
transport Transport
impl ServerImplementation
sessionManager SessionManager
sessionKeyGen SessionKeyGenerator
sessionInfoParser SessionInfoParser
addr url.URL
options ServerOptions
configMsg []byte
shutdown bool
shutdownRdy chan bool
currentOps uint32
opsLock *sync.Mutex
connectionsLock *sync.Mutex
connections []*connection
sessionsEnabled bool
sessionRegistry *sessionRegistry
messagePool message.Pool
// Internals
warnLog *log.Logger
errorLog *log.Logger
}
// shutdownServer initiates the shutdown of the underlying transport layer
func (srv *server) shutdownServer() error {
if err := srv.transport.Shutdown(); err != nil {
return fmt.Errorf("couldn't properly shutdown HTTP server: %s", err)
}
return nil
}
// Run implements the Server interface
func (srv *server) Run() error {
return srv.transport.Serve()
}
// Address implements the Server interface
func (srv *server) Address() url.URL {
return srv.transport.Address()
}
// Shutdown implements the Server interface
func (srv *server) Shutdown() error {
srv.opsLock.Lock()
srv.shutdown = true
// Don't block if there's no currently processed operations
if srv.currentOps < 1 {
srv.opsLock.Unlock()
return srv.shutdownServer()
}
srv.opsLock.Unlock()
// Wait until the server is ready for shutdown
<-srv.shutdownRdy
return srv.shutdownServer()
}
// ActiveSessionsNum implements the Server interface
func (srv *server) ActiveSessionsNum() int {
return srv.sessionRegistry.activeSessionsNum()
}
// SessionConnectionsNum implements the Server interface
func (srv *server) SessionConnectionsNum(sessionKey string) int {
return srv.sessionRegistry.sessionConnectionsNum(sessionKey)
}
// SessionConnections implements the Server interface
func (srv *server) SessionConnections(sessionKey string) []Connection {
return srv.sessionRegistry.sessionConnections(sessionKey)
}
// CloseSession implements the Server interface
func (srv *server) CloseSession(sessionKey string) (
affectedConnections []Connection,
errors []error,
generalError error,
) {
connections := srv.sessionRegistry.sessionConnections(sessionKey)
errors = make([]error, len(connections))
if connections == nil {
return nil, nil, nil
}
affectedConnections = make([]Connection, len(connections))
i := 0
errNum := 0
for _, connection := range connections {
affectedConnections[i] = connection
err := connection.CloseSession()
if err != nil {
errors[i] = err
errNum++
} else {
errors[i] = nil
}
i++
}
if errNum > 0 {
generalError = fmt.Errorf(
"%d errors during the closure of a session",
errNum,
)
}
return affectedConnections, errors, generalError
}