-
Notifications
You must be signed in to change notification settings - Fork 272
/
balancer.lua
210 lines (156 loc) · 5.46 KB
/
balancer.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
-- Copyright (C) Yichun Zhang (agentzh)
local base = require "resty.core.base"
base.allows_subsystem('http', 'stream')
local ffi = require "ffi"
local C = ffi.C
local ffi_str = ffi.string
local errmsg = base.get_errmsg_ptr()
local FFI_OK = base.FFI_OK
local FFI_ERROR = base.FFI_ERROR
local int_out = ffi.new("int[1]")
local getfenv = getfenv
local error = error
local type = type
local tonumber = tonumber
local max = math.max
local subsystem = ngx.config.subsystem
local ngx_lua_ffi_balancer_set_current_peer
local ngx_lua_ffi_balancer_set_more_tries
local ngx_lua_ffi_balancer_get_last_failure
local ngx_lua_ffi_balancer_set_timeouts -- used by both stream and http
if subsystem == 'http' then
ffi.cdef[[
int ngx_http_lua_ffi_balancer_set_current_peer(ngx_http_request_t *r,
const unsigned char *addr, size_t addr_len, int port, char **err);
int ngx_http_lua_ffi_balancer_set_more_tries(ngx_http_request_t *r,
int count, char **err);
int ngx_http_lua_ffi_balancer_get_last_failure(ngx_http_request_t *r,
int *status, char **err);
int ngx_http_lua_ffi_balancer_set_timeouts(ngx_http_request_t *r,
long connect_timeout, long send_timeout,
long read_timeout, char **err);
]]
ngx_lua_ffi_balancer_set_current_peer =
C.ngx_http_lua_ffi_balancer_set_current_peer
ngx_lua_ffi_balancer_set_more_tries =
C.ngx_http_lua_ffi_balancer_set_more_tries
ngx_lua_ffi_balancer_get_last_failure =
C.ngx_http_lua_ffi_balancer_get_last_failure
ngx_lua_ffi_balancer_set_timeouts =
C.ngx_http_lua_ffi_balancer_set_timeouts
elseif subsystem == 'stream' then
ffi.cdef[[
int ngx_stream_lua_ffi_balancer_set_current_peer(
ngx_stream_lua_request_t *r,
const unsigned char *addr, size_t addr_len, int port, char **err);
int ngx_stream_lua_ffi_balancer_set_more_tries(ngx_stream_lua_request_t *r,
int count, char **err);
int ngx_stream_lua_ffi_balancer_get_last_failure(
ngx_stream_lua_request_t *r, int *status, char **err);
int ngx_stream_lua_ffi_balancer_set_timeouts(ngx_stream_lua_request_t *r,
long connect_timeout, long timeout, char **err);
]]
ngx_lua_ffi_balancer_set_current_peer =
C.ngx_stream_lua_ffi_balancer_set_current_peer
ngx_lua_ffi_balancer_set_more_tries =
C.ngx_stream_lua_ffi_balancer_set_more_tries
ngx_lua_ffi_balancer_get_last_failure =
C.ngx_stream_lua_ffi_balancer_get_last_failure
local ngx_stream_lua_ffi_balancer_set_timeouts =
C.ngx_stream_lua_ffi_balancer_set_timeouts
ngx_lua_ffi_balancer_set_timeouts =
function(r, connect_timeout, send_timeout, read_timeout, err)
local timeout = max(send_timeout, read_timeout)
return ngx_stream_lua_ffi_balancer_set_timeouts(r, connect_timeout,
timeout, err)
end
else
error("unknown subsystem: " .. subsystem)
end
local peer_state_names = {
[1] = "keepalive",
[2] = "next",
[4] = "failed",
}
local _M = { version = base.version }
function _M.set_current_peer(addr, port)
local r = getfenv(0).__ngx_req
if not r then
error("no request found")
end
if not port then
port = 0
elseif type(port) ~= "number" then
port = tonumber(port)
end
local rc = ngx_lua_ffi_balancer_set_current_peer(r, addr, #addr,
port, errmsg)
if rc == FFI_OK then
return true
end
return nil, ffi_str(errmsg[0])
end
function _M.set_more_tries(count)
local r = getfenv(0).__ngx_req
if not r then
error("no request found")
end
local rc = ngx_lua_ffi_balancer_set_more_tries(r, count, errmsg)
if rc == FFI_OK then
if errmsg[0] == nil then
return true
end
return true, ffi_str(errmsg[0]) -- return the warning
end
return nil, ffi_str(errmsg[0])
end
function _M.get_last_failure()
local r = getfenv(0).__ngx_req
if not r then
error("no request found")
end
local state = ngx_lua_ffi_balancer_get_last_failure(r, int_out, errmsg)
if state == 0 then
return nil
end
if state == FFI_ERROR then
return nil, nil, ffi_str(errmsg[0])
end
return peer_state_names[state] or "unknown", int_out[0]
end
function _M.set_timeouts(connect_timeout, send_timeout, read_timeout)
local r = getfenv(0).__ngx_req
if not r then
error("no request found")
end
if not connect_timeout then
connect_timeout = 0
elseif type(connect_timeout) ~= "number" or connect_timeout <= 0 then
error("bad connect timeout", 2)
else
connect_timeout = connect_timeout * 1000
end
if not send_timeout then
send_timeout = 0
elseif type(send_timeout) ~= "number" or send_timeout <= 0 then
error("bad send timeout", 2)
else
send_timeout = send_timeout * 1000
end
if not read_timeout then
read_timeout = 0
elseif type(read_timeout) ~= "number" or read_timeout <= 0 then
error("bad read timeout", 2)
else
read_timeout = read_timeout * 1000
end
local rc
rc = ngx_lua_ffi_balancer_set_timeouts(r, connect_timeout,
send_timeout, read_timeout,
errmsg)
if rc == FFI_OK then
return true
end
return false, ffi_str(errmsg[0])
end
return _M