-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathclient.lua
1201 lines (1053 loc) · 36.9 KB
/
client.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--- MQTT client module
-- @module mqtt.client
-- @alias client
local client = {}
-- TODO: list event names
-------
-- load required stuff
local type = type
local error = error
local select = select
local require = require
local tostring = tostring
local os = require("os")
local os_time = os.time
local string = require("string")
local str_format = string.format
local str_gsub = string.gsub
local str_match = string.match
local table = require("table")
local table_remove = table.remove
local coroutine = require("coroutine")
local coroutine_create = coroutine.create
local coroutine_resume = coroutine.resume
local coroutine_yield = coroutine.yield
local math = require("math")
local math_random = math.random
local math_randomseed = math.randomseed
local luamqtt_VERSION
local protocol = require("mqtt.protocol")
local packet_type = protocol.packet_type
local check_qos = protocol.check_qos
local next_packet_id = protocol.next_packet_id
local packet_id_required = protocol.packet_id_required
local protocol4 = require("mqtt.protocol4")
local make_packet4 = protocol4.make_packet
local parse_packet4 = protocol4.parse_packet
local protocol5 = require("mqtt.protocol5")
local make_packet5 = protocol5.make_packet
local parse_packet5 = protocol5.parse_packet
local ioloop = require("mqtt.ioloop")
local ioloop_get = ioloop.get
-------
-- pseudo-random generator initialized flag
local random_initialized = false
-------
--- MQTT client instance metatable
-- @type client_mt
local client_mt = {}
client_mt.__index = client_mt
--- Create and initialize MQTT client instance
-- @tparam table args MQTT client creation arguments table
-- @tparam string args.uri MQTT broker uri to connect.
-- Expecting "host:port" or "host" format, in second case the port will be selected automatically:
-- 1883 port for plain or 8883 for secure network connections
-- @tparam string args.clean clean session start flag
-- @tparam[opt=4] number args.version MQTT protocol version to use, either 4 (for MQTT v3.1.1) or 5 (for MQTT v5.0).
-- Also you may use special values mqtt.v311 or mqtt.v50 for this field.
-- @tparam[opt] string args.id MQTT client ID, will be generated by luamqtt library if absent
-- @tparam[opt] string args.username username for authorization on MQTT broker
-- @tparam[opt] string args.password password for authorization on MQTT broker; not acceptable in absence of username
-- @tparam[opt=false] boolean,table args.secure use secure network connection, provided by luasec lua module;
-- set to true to select default params: { mode="client", protocol="tlsv1_2", verify="none", options="all" }
-- or set to luasec-compatible table, for example with cafile="...", certificate="...", key="..."
-- @tparam[opt] table args.will will message table with required fields { topic="...", payload="..." }
-- and optional fields { qos=1...3, retain=true/false }
-- @tparam[opt=60] number args.keep_alive time interval for client to send PINGREQ packets to the server when network connection is inactive
-- @tparam[opt=false] boolean args.reconnect force created MQTT client to reconnect on connection close.
-- Set to number value to provide reconnect timeout in seconds
-- It's not recommended to use values < 3
-- @tparam[opt] table args.connector connector table to open and send/receive packets over network connection.
-- default is require("mqtt.luasocket"), or require("mqtt.luasocket_ssl") if secure argument is set
-- @tparam[opt="ssl"] string args.ssl_module module name for the luasec-compatible ssl module, default is "ssl"
-- may be used in some non-standard lua environments with own luasec-compatible ssl module
-- @treturn client_mt MQTT client instance table
function client_mt:__init(args)
if not luamqtt_VERSION then
luamqtt_VERSION = require("mqtt")._VERSION
end
-- fetch and validate client args
local a = {} -- own client copy of args
for key, value in pairs(args) do
if type(key) ~= "string" then
error("expecting string key in args, got: "..type(key))
end
local value_type = type(value)
if key == "uri" then
assert(value_type == "string", "expecting uri to be a string")
a.uri = value
elseif key == "clean" then
assert(value_type == "boolean", "expecting clean to be a boolean")
a.clean = value
elseif key == "version" then
assert(value_type == "number", "expecting version to be a number")
assert(value == 4 or value == 5, "expecting version to be a value either 4 or 5")
a.version = value
elseif key == "id" then
assert(value_type == "string", "expecting id to be a string")
a.id = value
elseif key == "username" then
assert(value_type == "string", "expecting username to be a string")
a.username = value
elseif key == "password" then
assert(value_type == "string", "expecting password to be a string")
a.password = value
elseif key == "secure" then
assert(value_type == "boolean" or value_type == "table", "expecting secure to be a boolean or table")
a.secure = value
elseif key == "will" then
assert(value_type == "table", "expecting will to be a table")
a.will = value
elseif key == "keep_alive" then
assert(value_type == "number", "expecting keep_alive to be a number")
a.keep_alive = value
elseif key == "properties" then
assert(value_type == "table", "expecting properties to be a table")
a.properties = value
elseif key == "user_properties" then
assert(value_type == "table", "expecting user_properties to be a table")
a.user_properties = value
elseif key == "reconnect" then
assert(value_type == "boolean" or value_type == "number", "expecting reconnect to be a boolean or number")
a.reconnect = value
elseif key == "connector" then
a.connector = value
elseif key == "ssl_module" then
assert(value_type == "string", "expecting ssl_module to be a string")
a.ssl_module = value
else
error("unexpected key in client args: "..key.." = "..tostring(value))
end
end
-- check required arguments
assert(a.uri, 'expecting uri="..." to create MQTT client')
assert(a.clean ~= nil, "expecting clean=true or clean=false to create MQTT client")
assert(not a.password or a.username, "password is not accepted in absence of username")
if not a.id then
-- generate random client id
if not random_initialized then
-- initialize pseudo-random generator with current time seed
math_randomseed(os_time())
random_initialized = true
end
a.id = str_format("luamqtt-v%s-%07x", str_gsub(luamqtt_VERSION, "[^%d]", "-"), math_random(1, 0xFFFFFFF))
end
-- default connector
if a.connector == nil then
if a.secure then
a.connector = require("mqtt.luasocket_ssl")
else
a.connector = require("mqtt.luasocket")
end
end
-- validate connector content
assert(type(a.connector) == "table", "expecting connector to be a table")
assert(type(a.connector.connect) == "function", "expecting connector.connect to be a function")
assert(type(a.connector.shutdown) == "function", "expecting connector.shutdown to be a function")
assert(type(a.connector.send) == "function", "expecting connector.send to be a function")
assert(type(a.connector.receive) == "function", "expecting connector.receive to be a function")
-- will table content check
if a.will then
assert(type(a.will.topic) == "string", "expecting will.topic to be a string")
assert(type(a.will.payload) == "string", "expecting will.payload to be a string")
if a.will.qos ~= nil then
assert(type(a.will.qos) == "number", "expecting will.qos to be a number")
assert(check_qos(a.will.qos), "expecting will.qos to be a valid QoS value")
end
if a.will.retain ~= nil then
assert(type(a.will.retain) == "boolean", "expecting will.retain to be a boolean")
end
end
-- default keep_alive
if not a.keep_alive then
a.keep_alive = 60
end
-- client args
self.args = a
-- event handlers
self.handlers = {
connect = {},
subscribe = {},
unsubscribe = {},
message = {},
acknowledge = {},
error = {},
close = {},
auth = {},
}
self._handling = {}
self._to_remove_handlers = {}
-- state
self.first_connect = true -- contains true to perform one network connection attemt after client creation
self.send_time = 0 -- time of the last network send from client side
-- packet creation/parse functions according version
if not a.version then
a.version = 4
end
if a.version == 4 then
self._make_packet = make_packet4
self._parse_packet = parse_packet4
elseif a.version == 5 then
self._make_packet = make_packet5
self._parse_packet = parse_packet5
end
-- automatically add client to default ioloop, if it's available and running, then start connecting
local loop = ioloop_get(false)
if loop and loop.running then
loop:add(self)
self:start_connecting()
end
end
--- Add functions as handlers of given events
-- @param ... (event_name, function) or { event1 = func1, event2 = func2 } table
function client_mt:on(...)
local nargs = select("#", ...)
local events
if nargs == 2 then
events = { [select(1, ...)] = select(2, ...) }
elseif nargs == 1 then
events = select(1, ...)
else
error("invalid args: expected only one or two arguments")
end
for event, func in pairs(events) do
assert(type(event) == "string", "expecting event to be a string")
assert(type(func) == "function", "expecting func to be a function")
local handlers = self.handlers[event]
if not handlers then
error("invalid event '"..tostring(event).."' to handle")
end
handlers[#handlers + 1] = func
end
end
-- Remove one item from the list-table with full-iteration
local function remove_item(list, item)
for i, test in ipairs(list) do
if test == item then
table_remove(list, i)
return
end
end
end
--- Remove given function handler for specified event
-- @tparam string event event name to remove handler
-- @tparam function func handler function to remove
function client_mt:off(event, func)
local handlers = self.handlers[event]
if not handlers then
error("invalid event '"..tostring(event).."' to handle")
end
if self._handling[event] then
-- this event is handling now, schedule the function removing to the moment after all handlers will be called for the event
local to_remove = self._to_remove_handlers[event] or {}
to_remove[#to_remove + 1] = func
self._to_remove_handlers[event] = to_remove
else
-- it's ok to remove given function just now
remove_item(handlers, func)
end
return true
end
--- Subscribe to specified topic. Returns the SUBSCRIBE packet id and calls optional callback when subscription will be created on broker
-- @tparam table args subscription arguments
-- @tparam string args.topic topic to subscribe
-- @tparam[opt=0] number args.qos QoS level for subscription
-- @tparam[opt] table args.properties properties for subscribe operation
-- @tparam[opt] table args.user_properties user properties for subscribe operation
-- @tparam[opt] function args.callback callback function to be called when subscription will be created
-- @return packet id on success or false and error message on failure
function client_mt:subscribe(args)
-- fetch and validate args
assert(type(args) == "table", "expecting args to be a table")
assert(type(args.topic) == "string", "expecting args.topic to be a string")
assert(args.qos == nil or (type(args.qos) == "number" and check_qos(args.qos)), "expecting valid args.qos value")
assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table")
assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table")
assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function")
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create SUBSCRIBE packet
local pargs = {
type = packet_type.SUBSCRIBE,
subscriptions = {
{
topic = args.topic,
qos = args.qos,
},
},
properties = args.properties,
user_properties = args.user_properties,
}
self:_assign_packet_id(pargs)
local packet_id = pargs.packet_id
local subscribe = self._make_packet(pargs)
-- send SUBSCRIBE packet
local ok, err = self:_send_packet(subscribe)
if not ok then
err = "failed to send SUBSCRIBE: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- add subscribe callback
local callback = args.callback
if callback then
local function handler(suback, ...)
if suback.packet_id == packet_id then
self:off("subscribe", handler)
callback(suback, ...)
end
end
self:on("subscribe", handler)
end
-- returns assigned packet id
return packet_id
end
--- Unsubscribe from specified topic, and calls optional callback when subscription will be removed on broker
-- @tparam table args subscription arguments
-- @tparam string args.topic topic to unsubscribe
-- @tparam[opt] table args.properties properties for unsubscribe operation
-- @tparam[opt] table args.user_properties user properties for unsubscribe operation
-- @tparam[opt] function args.callback callback function to be called when subscription will be removed on broker
-- @return packet id on success or false and error message on failure
function client_mt:unsubscribe(args)
-- fetch and validate args
assert(type(args) == "table", "expecting args to be a table")
assert(type(args.topic) == "string", "expecting args.topic to be a string")
assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table")
assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table")
assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function")
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create UNSUBSCRIBE packet
local pargs = {
type = packet_type.UNSUBSCRIBE,
subscriptions = {args.topic},
properties = args.properties,
user_properties = args.user_properties,
}
self:_assign_packet_id(pargs)
local packet_id = pargs.packet_id
local unsubscribe = self._make_packet(pargs)
-- send UNSUBSCRIBE packet
local ok, err = self:_send_packet(unsubscribe)
if not ok then
err = "failed to send UNSUBSCRIBE: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- add unsubscribe callback
local callback = args.callback
if callback then
local function handler(unsuback, ...)
if unsuback.packet_id == packet_id then
self:off("unsubscribe", handler)
callback(unsuback, ...)
end
end
self:on("unsubscribe", handler)
end
-- returns assigned packet id
return packet_id
end
--- Publish message to broker
-- @tparam table args publish operation arguments table
-- @tparam string args.topic topic to publish message
-- @tparam[opt] string args.payload publish message payload
-- @tparam[opt=0] number args.qos QoS level for message publication
-- @tparam[opt=false] boolean args.retain retain message publication flag
-- @tparam[opt=false] boolean args.dup dup message publication flag
-- @tparam[opt] table args.properties properties for publishing message
-- @tparam[opt] table args.user_properties user properties for publishing message
-- @tparam[opt] function args.callback callback to call when publihsed message will be acknowledged
-- @return true or packet id on success or false and error message on failure
function client_mt:publish(args)
-- fetch and validate args
assert(type(args) == "table", "expecting args to be a table")
assert(type(args.topic) == "string", "expecting args.topic to be a string")
assert(args.payload == nil or type(args.payload) == "string", "expecting args.payload to be a string")
assert(args.qos == nil or type(args.qos) == "number", "expecting args.qos to be a number")
if args.qos then
assert(check_qos(args.qos), "expecting qos to be a valid QoS value")
end
assert(args.retain == nil or type(args.retain) == "boolean", "expecting args.retain to be a boolean")
assert(args.dup == nil or type(args.dup) == "boolean", "expecting args.dup to be a boolean")
assert(args.properties == nil or type(args.properties) == "table", "expecting args.properties to be a table")
assert(args.user_properties == nil or type(args.user_properties) == "table", "expecting args.user_properties to be a table")
assert(args.callback == nil or type(args.callback) == "function", "expecting args.callback to be a function")
-- check connection is alive
local conn = self.connection
if not conn then
return false, "network connection is not opened"
end
-- create PUBLISH packet
args.type = packet_type.PUBLISH
self:_assign_packet_id(args)
local packet_id = args.packet_id
local publish = self._make_packet(args)
-- send PUBLISH packet
local ok, err = self:_send_packet(publish)
if not ok then
err = "failed to send PUBLISH: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- record packet id as waited for QoS 2 exchange
if args.qos == 2 then
conn.wait_for_pubrec[packet_id] = true
end
-- add acknowledge callback
local callback = args.callback
if callback then
if packet_id then
local function handler(ack, ...)
if ack.packet_id == packet_id then
self:off("acknowledge", handler)
callback(ack, ...)
end
end
self:on("acknowledge", handler)
else
callback("no ack for QoS 0 message", self)
end
end
-- returns assigned packet id
return packet_id or true
end
--- Acknowledge given received message
-- @tparam packet_mt msg PUBLISH message to acknowledge
-- @tparam[opt=0] number rc The reason code field of PUBACK packet in MQTT v5.0 protocol
-- @tparam[opt] table properties properties for PUBACK/PUBREC packets
-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets
-- @return true on success or false and error message on failure
function client_mt:acknowledge(msg, rc, properties, user_properties)
assert(type(msg) == "table" and msg.type == packet_type.PUBLISH, "expecting msg to be a publish packet")
assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
-- check connection is alive
local conn = self.connection
if not conn then
return false, "network connection is not opened"
end
-- check packet needs to be acknowledged
local packet_id = msg.packet_id
if not packet_id then
return true
end
if msg.qos == 1 then
-- PUBACK should be sent
-- create PUBACK packet
local puback = self._make_packet{
type=packet_type.PUBACK,
packet_id=packet_id,
rc=rc or 0,
properties=properties,
user_properties=user_properties,
}
-- send PUBACK packet
local ok, err = self:_send_packet(puback)
if not ok then
err = "failed to send PUBACK: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
elseif msg.qos == 2 then
-- PUBREC should be sent and packet_id should be remembered for PUBREL+PUBCOMP sequence
-- create PUBREC packet
local pubrec = self._make_packet{
type=packet_type.PUBREC,
packet_id=packet_id,
rc=rc or 0,
properties=properties,
user_properties=user_properties,
}
-- send PUBREC packet
local ok, err = self:_send_packet(pubrec)
if not ok then
err = "failed to send PUBREC: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- store packet id as waiting for PUBREL
conn.wait_for_pubrel[packet_id] = true
end
return true
end
--- Send DISCONNECT packet to the broker and close the connection
-- @tparam[opt=0] number rc The Disconnect Reason Code value from MQTT v5.0 protocol
-- @tparam[opt] table properties properties for PUBACK/PUBREC packets
-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets
-- @return true on success or false and error message on failure
function client_mt:disconnect(rc, properties, user_properties)
-- validate args
assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create DISCONNECT packet
local disconnect = self._make_packet{
type=packet_type.DISCONNECT,
rc=rc or 0,
properties=properties,
user_properties=user_properties,
}
-- send DISCONNECT packet
local ok, err = self:_send_packet(disconnect)
if not ok then
err = "failed to send DISCONNECT: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- now close connection
self:close_connection("connection closed by client")
return true
end
--- Send AUTH packet to authenticate client on broker, in MQTT v5.0 protocol
-- @tparam[opt=0] number rc Authenticate Reason Code
-- @tparam[opt] table properties properties for PUBACK/PUBREC packets
-- @tparam[opt] table user_properties user properties for PUBACK/PUBREC packets
-- @return true on success or false and error message on failure
function client_mt:auth(rc, properties, user_properties)
-- validate args
assert(rc == nil or type(rc) == "number", "expecting rc to be a number")
assert(properties == nil or type(properties) == "table", "expecting properties to be a table")
assert(user_properties == nil or type(user_properties) == "table", "expecting user_properties to be a table")
assert(self.args.version == 5, "allowed only in MQTT v5.0 protocol")
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create AUTH packet
local auth = self._make_packet{
type=packet_type.AUTH,
rc=rc or 0,
properties=properties,
user_properties=user_properties,
}
-- send AUTH packet
local ok, err = self:_send_packet(auth)
if not ok then
err = "failed to send AUTH: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
return true
end
--- Immediately close established network connection, without graceful session finishing with DISCONNECT packet
-- @tparam[opt] string reason the reasong string of connection close
function client_mt:close_connection(reason)
assert(not reason or type(reason) == "string", "expecting reason to be a string")
local conn = self.connection
if not conn then
return true
end
local args = self.args
args.connector.shutdown(conn)
self.connection = nil
conn.close_reason = reason or "unspecified"
self:handle("close", conn, self)
-- check connection is still closed (self.connection may be re-created in "close" handler)
if not self.connection then
-- remove from ioloop
if self.ioloop and not args.reconnect then
self.ioloop:remove(self)
end
end
return true
end
--- Start connecting to broker
-- @return true on success or false and error message on failure
function client_mt:start_connecting()
-- print("start connecting") -- debug
-- open network connection
local ok, err = self:open_connection()
if not ok then
return false, err
end
-- send CONNECT packet
ok, err = self:send_connect()
if not ok then
return false, err
end
return true
end
--- Low-level methods
-- @section low-level
--- Send PINGREQ packet
-- @return true on success or false and error message on failure
function client_mt:send_pingreq()
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create PINGREQ packet
local pingreq = self._make_packet{
type=packet_type.PINGREQ,
}
-- send PINGREQ packet
local ok, err = self:_send_packet(pingreq)
if not ok then
err = "failed to send PINGREQ: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
return true
end
--- Open network connection to the broker
-- @return true on success or false and error message on failure
function client_mt:open_connection()
if self.connection then
return true
end
local args = self.args
assert(args.connector, "no connector configured in MQTT client")
-- create connection table
local conn = {
uri = args.uri,
wait_for_pubrec = {}, -- a table with packet_id of parially acknowledged sent packets in QoS 2 exchange process
wait_for_pubrel = {}, -- a table with packet_id of parially acknowledged received packets in QoS 2 exchange process
}
client_mt._parse_uri(args, conn)
client_mt._apply_secure(args, conn)
-- perform connect
local ok, err = args.connector.connect(conn)
if not ok then
err = "failed to open network connection: "..err
self:handle("error", err, self)
return false, err
end
-- assign connection
self.connection = conn
-- create receive function
local receive = args.connector.receive
self.connection.recv_func = function(size)
return receive(conn, size)
end
self:_apply_network_timeout()
return true
end
--- Send CONNECT packet into opened network connection
-- @return true on success or false and error message on failure
function client_mt:send_connect()
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
local args = self.args
-- create CONNECT packet
local connect = self._make_packet{
type=packet_type.CONNECT,
id=args.id,
clean = args.clean,
username = args.username,
password = args.password,
will = args.will,
keep_alive = args.keep_alive,
properties = args.properties,
user_properties = args.user_properties,
}
-- send CONNECT packet
local ok, err = self:_send_packet(connect)
if not ok then
err = "failed to send CONNECT: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- reset last packet id
self._last_packet_id = nil
return true
end
-- Internal methods
-- Set or rest ioloop for MQTT client
function client_mt:set_ioloop(loop)
self.ioloop = loop
self:_apply_network_timeout()
end
-- Send PUBREL acknowledge packet - second phase of QoS 2 exchange
-- Returns true on success or false and error message on failure
function client_mt:acknowledge_pubrel(packet_id)
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create PUBREL packet
local pubrel = self._make_packet{type=packet_type.PUBREL, packet_id=packet_id, rc=0}
-- send PUBREL packet
local ok, err = self:_send_packet(pubrel)
if not ok then
err = "failed to send PUBREL: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
return true
end
-- Send PUBCOMP acknowledge packet - last phase of QoS 2 exchange
-- Returns true on success or false and error message on failure
function client_mt:acknowledge_pubcomp(packet_id)
-- check connection is alive
if not self.connection then
return false, "network connection is not opened"
end
-- create PUBCOMP packet
local pubcomp = self._make_packet{type=packet_type.PUBCOMP, packet_id=packet_id, rc=0}
-- send PUBCOMP packet
local ok, err = self:_send_packet(pubcomp)
if not ok then
err = "failed to send PUBCOMP: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
return true
end
-- Call specified event handlers
function client_mt:handle(event, ...)
local handlers = self.handlers[event]
if not handlers then
error("invalid event '"..tostring(event).."' to handle")
end
self._handling[event] = true -- protecting self.handlers[event] table from modifications by client_mt:off() when iterating
for _, handler in ipairs(handlers) do
handler(...)
end
self._handling[event] = nil
-- process handlers removing, scheduled by client_mt:off()
local to_remove = self._to_remove_handlers[event]
if to_remove then
for _, func in ipairs(to_remove) do
remove_item(handlers, func)
end
self._to_remove_handlers[event] = nil
end
end
-- Internal methods
-- Assign next packet id for given packet creation args
function client_mt:_assign_packet_id(pargs)
if not pargs.packet_id then
if packet_id_required(pargs) then
self._last_packet_id = next_packet_id(self._last_packet_id)
pargs.packet_id = self._last_packet_id
end
end
end
-- Receive packet function in sync mode
local function sync_recv(self)
return true, self:_receive_packet()
end
-- Perform one input/output iteration, called by sync receiving loop
function client_mt:_sync_iteration()
return self:_io_iteration(sync_recv)
end
-- Receive packet function - from ioloop's coroutine
local function ioloop_recv(self)
return coroutine_resume(self.connection.coro)
end
-- Perform one input/output iteration, called by ioloop
function client_mt:_ioloop_iteration()
-- working according state
local loop = self.ioloop
local args = self.args
local conn = self.connection
if conn then
-- network connection opened
-- perform packet receiving using ioloop receive function
local ok, err = self:_io_iteration(ioloop_recv)
if ok then
-- send PINGREQ if keep_alive interval is reached
if os_time() - self.send_time >= args.keep_alive then
self:send_pingreq()
end
end
return ok, err
else
-- no connection - first connect, reconnect or remove from ioloop
if self.first_connect then
self.first_connect = false
self:start_connecting()
elseif args.reconnect then
if args.reconnect == true then
self:start_connecting()
else
-- reconnect in specified timeout
if self.reconnect_timer_start then
if os_time() - self.reconnect_timer_start >= args.reconnect then
self.reconnect_timer_start = nil
self:start_connecting()
else
loop:can_sleep()
end
else
self.reconnect_timer_start = os_time()
end
end
else
-- finish working with client
loop:remove(self)
end
end
end
-- Performing one IO iteration - receive next packet
function client_mt:_io_iteration(recv)
local conn = self.connection
-- first - try to receive packet
local ok, packet, err = recv(self)
-- print("received packet", ok, packet, err)
-- check coroutine resume status
if not ok then
err = "failed to resume receive packet coroutine: "..tostring(packet)
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- check for communication error
if packet == false then
if err == "closed" then
self:close_connection("connection closed by broker")
return false, err
else
err = "failed to receive next packet: "..err
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
end
-- check some packet received
if packet ~= "timeout" and packet ~= "wantread" then
if not conn.connack then
-- expecting only CONNACK packet here
if packet.type ~= packet_type.CONNACK then
err = "expecting CONNACK but received "..packet.type
self:handle("error", err, self)
self:close_connection("error")
return false, err
end
-- store connack packet in connection
conn.connack = packet
-- check CONNACK rc
if packet.rc ~= 0 then
err = str_format("CONNECT failed with CONNACK [rc=%d]: %s", packet.rc, packet:reason_string())
self:handle("error", err, self, packet)
self:handle("connect", packet, self)
self:close_connection("connection failed")
return false, err
end
-- fire connect event
self:handle("connect", packet, self)
else
-- connection authorized, so process usual packets
-- handle packet according its type
local ptype = packet.type
if ptype == packet_type.PINGRESP then -- luacheck: ignore
-- PINGREQ answer, nothing to do