-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc.go
388 lines (328 loc) · 8.46 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Simple RPC library with amqp as default underlying transport
package rpc
import (
"context"
"errors"
"github.com/RidgeA/amqp-rpc/transport"
"github.com/RidgeA/amqp-rpc/transport/amqp"
"github.com/google/uuid"
"log"
"os"
"strconv"
)
var (
silentLog = func(format string, args ...interface{}) {}
errorLog = log.Printf
)
// Instance mode: Client, Server or both
const (
ModeClient = 1 << iota
ModeServer
ModeDuplex = ModeClient | ModeServer
)
type (
//Interface that represents rpc server
Server interface {
Start() error
Shutdown()
RegisterHandler(string, HandlerFunc, ...HandlerOptionsFunc)
}
//Interface that represents rpc client
Client interface {
Start() error
Shutdown()
Call(context.Context, string, []byte, bool) ([]byte, error)
}
//Interface that represents rpc client and server
Duplex interface {
Start() error
Shutdown()
Call(context.Context, string, []byte, bool) ([]byte, error)
RegisterHandler(string, HandlerFunc, ...HandlerOptionsFunc)
}
//Interface for transport implementation
Transport interface {
Initialize() error
Shutdown()
Send(call transport.Call) error
Subscribe(key string, subscription transport.SubscribeFunc, throughput uint) error
Reply(transport.Reply) error
}
//Logger function
LogFunc func(string, ...interface{})
//A function to handle incoming message
HandlerFunc func([]byte) ([]byte, error)
//Function type to set options for RPC instance
OptionsFunc func(*rpc)
//Function type to set options for handler
HandlerOptionsFunc func(*handler)
handler struct {
method string
handler HandlerFunc
throughput uint
}
request struct {
id string
payload []byte
source string
method string
}
response struct {
req transport.Call
payload []byte
}
rpc struct {
errorf, info, debug LogFunc
extTransport bool
t Transport
instanceId string
mode int
url string
name string
listeners map[string]chan transport.Call
handlers map[string]*handler
}
)
func (r response) Request() transport.Call {
return r.req
}
func (r response) Payload() []byte {
return r.payload
}
func (p request) ID() string {
return p.id
}
func (p request) Payload() []byte {
return p.payload
}
func (p request) Source() string {
return p.source
}
func (p request) Method() string {
return p.method
}
//Creates new server instance with 'name'.
//Optional params could be configured via option functions.
//The name has to be the same across the whole infrastructure.
//Either url to ampq server or transport has to be configured via options.
func NewServer(name string, opts ...OptionsFunc) Server {
r := newRPC(name, opts...)
r.mode = ModeServer
r.handlers = make(map[string]*handler)
return r
}
//Creates new client instance with 'name'.
//Optional params could be configured via option functions.
//The name has to be the same across the whole infrastructure.
//Either url to ampq server or transport has to be configured via options.
func NewClient(name string, opts ...OptionsFunc) Client {
r := newRPC(name, opts...)
r.mode = ModeClient
r.listeners = make(map[string]chan transport.Call)
return r
}
//Creates new RPC instance with 'name' that could act both as server and client.
//Optional params could be configured via option functions.
//The name has to be the same across the whole infrastructure.
//Either url to ampq server or transport has to be configured via options.
func NewDuplex(name string, opts ...OptionsFunc) Duplex {
r := newRPC(name, opts...)
r.mode = ModeDuplex
r.listeners = make(map[string]chan transport.Call)
r.handlers = make(map[string]*handler)
return r
}
//Sets function to log errors
func SetError(f LogFunc) OptionsFunc {
return func(r *rpc) {
r.errorf = f
}
}
//Sets function to log informational messages
func SetInfo(f LogFunc) OptionsFunc {
return func(r *rpc) {
r.info = f
}
}
//Sets function to log debug messages
func SetDebug(f LogFunc) OptionsFunc {
return func(r *rpc) {
r.debug = f
}
}
//Sets URL to amqp server
func SetUrl(url string) OptionsFunc {
return func(r *rpc) {
r.url = url
}
}
//Sets already configured transport instance
//Transport implementation has to be initialized and destroyed manually
func SetTransport(transport Transport) OptionsFunc {
return func(r *rpc) {
r.extTransport = true
r.t = transport
}
}
// Sets an option for the method handler to limit throughput
// Limitation passes to underlying transport and has to be implemented by transport
func SetHandlerThroughput(throughput uint) HandlerOptionsFunc {
return func(h *handler) {
h.throughput = throughput
}
}
//Method to start RPC instance
//Depends on selected mode it initialize server, client or both.
//If transport instance wasn't passed manually it also initializes the default transport (amqp)
func (rpc *rpc) Start() error {
if !rpc.extTransport {
if err := rpc.t.Initialize(); err != nil {
return err
}
}
if rpc.mode&ModeClient == ModeClient {
if err := rpc.startClient(); err != nil {
return err
}
}
if rpc.mode&ModeServer == ModeServer {
if err := rpc.startServer(); err != nil {
return err
}
}
return nil
}
//Method for shutdown instance properly and close underlying transport instance
//The method doesn't close transport implementation if it has been passed manually
func (rpc *rpc) Shutdown() {
rpc.info("Shutting down rpc")
if !rpc.extTransport {
rpc.t.Shutdown()
}
}
//Method to call remote function
func (rpc *rpc) Call(ctx context.Context, method string, payload []byte, wait bool) ([]byte, error) {
rpc.debug("Calling method %s", method)
var response <-chan transport.Call
r := request{
id: uuid.New().String(),
payload: payload,
method: method,
source: rpc.instanceId,
}
err := rpc.t.Send(r)
// return either errorf or just nil if response doesn't required
if err != nil || !wait {
return nil, err
}
defer rpc.removeListener(r.id)
response = rpc.addListener(r.id)
var responseData []byte
select {
case responseMessage := <-response:
rpc.debug("Got response from server")
responseData = responseMessage.Payload()
case <-ctx.Done():
rpc.info("Got signal from context")
err = errors.New("canceled by context")
}
return responseData, err
}
//Registers a handler for the method
func (rpc *rpc) RegisterHandler(method string, f HandlerFunc, options ...HandlerOptionsFunc) {
rpc.debug("Register handler for method %rpc", method)
h := &handler{
method: method,
handler: f,
}
for _, setter := range options {
setter(h)
}
rpc.handlers[method] = h
}
func newRPC(name string, opts ...OptionsFunc) (r *rpc) {
r = new(rpc)
r.name = name
r.errorf = errorLog
r.info = silentLog
r.debug = silentLog
r.instanceId = r.createInstanceId()
for _, setter := range opts {
setter(r)
}
if r.t == nil {
r.t = amqp.New(
r.name,
r.instanceId,
r.url,
)
}
return
}
func (rpc *rpc) handle(f HandlerFunc) transport.SubscribeFunc {
return func(p transport.Call) error {
responsePayload, err := f(p.Payload())
if err != nil {
return err
}
if responsePayload != nil {
response := response{
req: p,
payload: responsePayload,
}
err = rpc.t.Reply(response)
}
return nil
}
}
func (rpc *rpc) startClient() error {
rpc.info("Starting client")
if err := rpc.t.Subscribe(rpc.instanceId, rpc.dispatchResponse, 0); err != nil {
return err
}
return nil
}
func (rpc *rpc) startServer() error {
rpc.info("Starting server")
var err error
for _, handler := range rpc.handlers {
err = rpc.t.Subscribe(handler.method, rpc.handle(handler.handler), handler.throughput)
if err != nil {
return err
}
}
return nil
}
func (rpc *rpc) createInstanceId() string {
host, err := os.Hostname()
if err != nil {
host = "unknown.host"
}
pid := strconv.Itoa(os.Getpid())
return rpc.name + "." + pid + "." + host
}
func (rpc *rpc) dispatchResponse(message transport.Call) error {
id := message.ID()
listener, exists := rpc.listeners[id]
rpc.info("Dispatching response, id: %s", id)
if exists {
listener <- message
} else {
rpc.errorf("Unknown id: %s", id)
}
return nil
}
func (rpc *rpc) addListener(id string) <-chan transport.Call {
listener := make(chan transport.Call)
rpc.info("Registering callback listener for id: %s", id)
rpc.listeners[id] = listener
return listener
}
func (rpc *rpc) removeListener(id string) {
rpc.info("Removing listener for id: %s", id)
listener, exists := rpc.listeners[id]
if exists {
close(listener)
delete(rpc.listeners, id)
}
}