mqtt.lua 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. --- 模块功能:MQTT客户端
  2. -- @module mqtt
  3. -- @author openLuat
  4. -- @license MIT
  5. -- @copyright openLuat
  6. -- @release 2017.10.24
  7. require "log"
  8. require "socket"
  9. require "utils"
  10. module(..., package.seeall)
  11. -- MQTT 指令id
  12. local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
  13. local CLIENT_COMMAND_TIMEOUT = 60000
  14. local function encodeLen(len)
  15. local s = ""
  16. local digit
  17. repeat
  18. digit = len % 128
  19. len = (len - digit) / 128
  20. if len > 0 then
  21. digit = bit.bor(digit, 0x80)
  22. end
  23. s = s .. string.char(digit)
  24. until (len <= 0)
  25. return s
  26. end
  27. local function encodeUTF8(s)
  28. if not s or #s == 0 then
  29. return ""
  30. else
  31. return pack.pack(">P", s)
  32. end
  33. end
  34. local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
  35. local content = pack.pack(">PbbHPAAAA",
  36. version == "3.1" and "MQIsdp" or "MQTT",
  37. version == "3.1" and 3 or 4,
  38. (#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
  39. keepAlive,
  40. clientId,
  41. encodeUTF8(will.topic),
  42. encodeUTF8(will.payload),
  43. encodeUTF8(username),
  44. encodeUTF8(password))
  45. return pack.pack(">bAA",
  46. CONNECT * 16,
  47. encodeLen(string.len(content)),
  48. content)
  49. end
  50. local function packSUBSCRIBE(dup, packetId, topics)
  51. local header = SUBSCRIBE * 16 + dup * 8 + 2
  52. local data = pack.pack(">H", packetId)
  53. for topic, qos in pairs(topics) do
  54. data = data .. pack.pack(">Pb", topic, qos)
  55. end
  56. return pack.pack(">bAA", header, encodeLen(#data), data)
  57. end
  58. local function packUNSUBSCRIBE(dup, packetId, topics)
  59. local header = UNSUBSCRIBE * 16 + dup * 8 + 2
  60. local data = pack.pack(">H", packetId)
  61. for k, topic in pairs(topics) do
  62. data = data .. pack.pack(">P", topic)
  63. end
  64. return pack.pack(">bAA", header, encodeLen(#data), data)
  65. end
  66. local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
  67. local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
  68. local len = 2 + #topic + #payload
  69. if qos > 0 then
  70. return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
  71. else
  72. return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
  73. end
  74. end
  75. local function packACK(id, dup, packetId)
  76. return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
  77. end
  78. local function packZeroData(id, dup, qos, retain)
  79. dup = dup or 0
  80. qos = qos or 0
  81. retain = retain or 0
  82. return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
  83. end
  84. local function unpack(s)
  85. if #s < 2 then return end
  86. log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
  87. -- read remaining length
  88. local len = 0
  89. local multiplier = 1
  90. local pos = 2
  91. repeat
  92. if pos > #s then return end
  93. local digit = string.byte(s, pos)
  94. len = len + ((digit % 128) * multiplier)
  95. multiplier = multiplier * 128
  96. pos = pos + 1
  97. until digit < 128
  98. if #s < len + pos - 1 then return end
  99. local header = string.byte(s, 1)
  100. local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
  101. local nextpos
  102. if packet.id == CONNACK then
  103. nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
  104. elseif packet.id == PUBLISH then
  105. nextpos, packet.topic = pack.unpack(s, ">P", pos)
  106. if packet.qos > 0 then
  107. nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
  108. end
  109. packet.payload = string.sub(s, nextpos, pos + len - 1)
  110. elseif packet.id ~= PINGRESP then
  111. if len >= 2 then
  112. nextpos, packet.packetId = pack.unpack(s, ">H", pos)
  113. else
  114. packet.packetId = 0
  115. end
  116. end
  117. return packet, pos + len
  118. end
  119. local mqttc = {}
  120. mqttc.__index = mqttc
  121. --- 创建一个mqtt client实例
  122. -- @string clientId
  123. -- @number[opt=300] keepAlive 心跳间隔(单位为秒),默认300秒
  124. -- @string[opt=""] username 用户名,用户名为空配置为""或者nil
  125. -- @string[opt=""] password 密码,密码为空配置为""或者nil
  126. -- @number[opt=1] cleanSession 1/0
  127. -- @table[opt=nil] will 遗嘱参数,格式为{qos=, retain=, topic=, payload=}
  128. -- @string[opt="3.1.1"] version MQTT版本号
  129. -- @return table mqttc client实例
  130. -- @usage
  131. -- mqttc = mqtt.client("clientid-123")
  132. -- mqttc = mqtt.client("clientid-123",200)
  133. -- mqttc = mqtt.client("clientid-123",nil,"user","password")
  134. -- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,nil,"3.1")
  135. function client(clientId, keepAlive, username, password, cleanSession, will, version)
  136. local o = {}
  137. local packetId = 1
  138. if will then
  139. will.flag = 1
  140. else
  141. will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
  142. end
  143. o.clientId = clientId
  144. o.keepAlive = keepAlive or 300
  145. o.username = username or ""
  146. o.password = password or ""
  147. o.cleanSession = cleanSession or 1
  148. o.version = version or "3.1.1"
  149. o.will = will
  150. o.commandTimeout = CLIENT_COMMAND_TIMEOUT
  151. o.cache = {}-- 接收到的mqtt数据包缓冲
  152. o.inbuf = "" -- 未完成的数据缓冲
  153. o.connected = false
  154. o.getNextPacketId = function()
  155. packetId = packetId == 65535 and 1 or (packetId + 1)
  156. return packetId
  157. end
  158. o.lastOTime = 0
  159. setmetatable(o, mqttc)
  160. return o
  161. end
  162. -- 检测是否需要发送心跳包
  163. function mqttc:checkKeepAlive()
  164. if self.keepAlive == 0 then return true end
  165. if os.time() - self.lastOTime >= self.keepAlive then
  166. if not self:write(packZeroData(PINGREQ)) then
  167. log.info("mqtt.client:", "pingreq send fail")
  168. return false
  169. end
  170. end
  171. return true
  172. end
  173. -- 发送mqtt数据
  174. function mqttc:write(data)
  175. log.debug("mqtt.client:write", string.toHex(string.sub(data, 1, 50)))
  176. local r = self.io:send(data)
  177. if r then self.lastOTime = os.time() end
  178. return r
  179. end
  180. -- 接收mqtt数据包
  181. function mqttc:read(timeout, msg, msgNoResume)
  182. if not self:checkKeepAlive() then
  183. log.warn("mqtt.read checkKeepAlive fail")
  184. return false
  185. end
  186. -- 处理之前缓冲的数据
  187. local packet, nextpos = unpack(self.inbuf)
  188. if packet then
  189. self.inbuf = string.sub(self.inbuf, nextpos)
  190. return true, packet
  191. end
  192. while true do
  193. local recvTimeout
  194. if self.keepAlive == 0 then
  195. recvTimeout = timeout
  196. else
  197. local kaTimeout = (self.keepAlive - (os.time() - self.lastOTime)) * 1000
  198. recvTimeout = kaTimeout > timeout and timeout or kaTimeout
  199. end
  200. local r, s, p = self.io:recv(recvTimeout == 0 and 5 or recvTimeout, msg, msgNoResume)
  201. if r then
  202. self.inbuf = self.inbuf .. s
  203. elseif s == "timeout" then -- 超时,判断是否需要发送心跳包
  204. if not self:checkKeepAlive() then
  205. return false
  206. elseif timeout <= recvTimeout then
  207. return false, "timeout"
  208. else
  209. timeout = timeout - recvTimeout
  210. end
  211. else -- 其他错误直接返回
  212. return r, s, p
  213. end
  214. local packet, nextpos = unpack(self.inbuf)
  215. if packet then
  216. --self.lastIOTime = os.time()
  217. self.inbuf = string.sub(self.inbuf, nextpos)
  218. if packet.id ~= PINGRESP then
  219. return true, packet
  220. end
  221. end
  222. end
  223. end
  224. -- 等待接收指定的mqtt消息
  225. function mqttc:waitfor(id, timeout, msg, msgNoResume)
  226. for index, packet in ipairs(self.cache) do
  227. if packet.id == id then
  228. return true, table.remove(self.cache, index)
  229. end
  230. end
  231. while true do
  232. local insertCache = true
  233. local r, data, param = self:read(timeout, msg, msgNoResume)
  234. if r then
  235. if data.id == PUBLISH then
  236. if data.qos > 0 then
  237. if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
  238. log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
  239. return false
  240. end
  241. end
  242. elseif data.id == PUBREC or data.id == PUBREL then
  243. if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
  244. log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
  245. return false
  246. end
  247. insertCache = false
  248. end
  249. if data.id == id then
  250. return true, data
  251. end
  252. if insertCache then table.insert(self.cache, data) end
  253. else
  254. return false, data, param
  255. end
  256. end
  257. end
  258. --- 连接mqtt服务器
  259. -- @string host 服务器地址
  260. -- @param port string或者number类型,服务器端口
  261. -- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
  262. -- @table[opt=nil] cert,table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下:
  263. -- {
  264. -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
  265. -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
  266. -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
  267. -- clientPassword = "123456", --客户端证书文件密码[可选]
  268. -- }
  269. -- @number timeout, 链接服务器最长超时时间
  270. -- @return result true表示成功,false或者nil表示失败
  271. -- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
  272. function mqttc:connect(host, port, transport, cert, timeout)
  273. if self.connected then
  274. log.info("mqtt.client:connect", "has connected")
  275. return false
  276. end
  277. if self.io then
  278. self.io:close()
  279. self.io = nil
  280. end
  281. if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
  282. log.info("mqtt.client:connect", "invalid transport", transport)
  283. return false
  284. end
  285. self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
  286. if not self.io:connect(host, port, timeout) then
  287. log.info("mqtt.client:connect", "connect host fail")
  288. return false
  289. end
  290. if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
  291. log.info("mqtt.client:connect", "send fail")
  292. return false
  293. end
  294. local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
  295. if not r or packet.rc ~= 0 then
  296. log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
  297. return false
  298. end
  299. self.connected = true
  300. return true
  301. end
  302. --- 订阅主题
  303. -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
  304. -- @param[opt=0] qos,number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil
  305. -- @return bool true表示成功,false或者nil表示失败
  306. -- @usage
  307. -- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
  308. -- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
  309. function mqttc:subscribe(topic, qos)
  310. if not self.connected then
  311. log.info("mqtt.client:subscribe", "not connected")
  312. return false
  313. end
  314. local topics
  315. if type(topic) == "string" then
  316. topics = {[topic] = qos and qos or 0}
  317. else
  318. topics = topic
  319. end
  320. if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  321. log.info("mqtt.client:subscribe", "send failed")
  322. return false
  323. end
  324. if not self:waitfor(SUBACK, self.commandTimeout, nil, true) then
  325. log.info("mqtt.client:subscribe", "wait ack failed")
  326. return false
  327. end
  328. return true
  329. end
  330. --- 取消订阅主题
  331. -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
  332. -- @return bool true表示成功,false或者nil表示失败
  333. -- @usage
  334. -- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
  335. -- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
  336. function mqttc:unsubscribe(topic)
  337. if not self.connected then
  338. log.info("mqtt.client:unsubscribe", "not connected")
  339. return false
  340. end
  341. local topics
  342. if type(topic) == "string" then
  343. topics = {topic}
  344. else
  345. topics = topic
  346. end
  347. if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  348. log.info("mqtt.client:unsubscribe", "send failed")
  349. return false
  350. end
  351. if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
  352. log.info("mqtt.client:unsubscribe", "wait ack failed")
  353. return false
  354. end
  355. return true
  356. end
  357. --- 发布一条消息
  358. -- @string topic UTF8编码的字符串
  359. -- @string payload 用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换
  360. -- @number[opt=0] qos 0/1/2, default 0
  361. -- @number[opt=0] retain 0或者1
  362. -- @return bool 发布成功返回true,失败返回false
  363. -- @usage
  364. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  365. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  366. -- mqttc:publish("/topic", "publish from luat mqtt client", 0)
  367. function mqttc:publish(topic, payload, qos, retain)
  368. if not self.connected then
  369. log.info("mqtt.client:publish", "not connected")
  370. return false
  371. end
  372. qos = qos or 0
  373. retain = retain or 0
  374. if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
  375. log.info("mqtt.client:publish", "socket send failed")
  376. return false
  377. end
  378. if qos == 0 then return true end
  379. if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
  380. log.warn("mqtt.client:publish", "wait ack timeout")
  381. return false
  382. end
  383. return true
  384. end
  385. --- 接收消息
  386. -- @number timeout 接收超时时间,单位毫秒
  387. -- @string[opt=nil] msg 可选参数,控制socket所在的线程退出recv阻塞状态
  388. -- @return result 数据接收结果,true表示成功,false表示失败
  389. -- @return data 如果result为true,表示服务器发过来的包;如果result为false,表示错误信息,超时失败时为"timeout"
  390. -- @return param msg控制退出时,返回msg的字符串
  391. -- @usage
  392. -- true, packet = mqttc:receive(2000)
  393. -- false, error_message = mqttc:receive(2000)
  394. -- false, msg, para = mqttc:receive(2000)
  395. function mqttc:receive(timeout, msg)
  396. if not self.connected then
  397. log.info("mqtt.client:receive", "not connected")
  398. return false
  399. end
  400. return self:waitfor(PUBLISH, timeout, msg)
  401. end
  402. --- 断开与服务器的连接
  403. -- @return nil
  404. -- @usage
  405. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  406. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  407. -- process data
  408. -- mqttc:disconnect()
  409. function mqttc:disconnect()
  410. if self.io then
  411. if self.connected then self:write(packZeroData(DISCONNECT)) end
  412. self.io:close()
  413. self.io = nil
  414. end
  415. self.cache = {}
  416. self.inbuf = ""
  417. self.connected = false
  418. end