mqtt.lua 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. --- mqtt client
  2. -- @module mqtt
  3. -- @author 小强
  4. -- @license MIT
  5. -- @copyright openLuat
  6. -- @release 2017.10.24
  7. require "log"
  8. require "socket"
  9. require "utils"
  10. module(..., package.seeall)
  11. -- MQTT 指令id
  12. local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
  13. local CLIENT_COMMAND_TIMEOUT = 5000
  14. local function encodeLen(len)
  15. local s = ""
  16. local digit
  17. repeat
  18. digit = len % 128
  19. len = len / 128
  20. if len > 0 then
  21. digit = bit.bor(digit, 0x80)
  22. end
  23. s = s .. string.char(digit)
  24. until (len <= 0)
  25. return s
  26. end
  27. local function encodeUTF8(s)
  28. if not s or #s == 0 then
  29. return ""
  30. else
  31. return pack.pack(">P", s)
  32. end
  33. end
  34. local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will)
  35. local content = pack.pack(">PbbHPAAAA",
  36. "MQTT",
  37. 4,
  38. (#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
  39. keepAlive,
  40. clientId,
  41. encodeUTF8(will.topic),
  42. encodeUTF8(will.payload),
  43. encodeUTF8(username),
  44. encodeUTF8(password))
  45. return pack.pack(">bAA",
  46. CONNECT * 16,
  47. encodeLen(string.len(content)),
  48. content)
  49. end
  50. local function packSUBSCRIBE(dup, packetId, topics)
  51. local header = SUBSCRIBE * 16 + dup * 8 + 2
  52. local data = pack.pack(">H", packetId)
  53. for topic, qos in pairs(topics) do
  54. data = data .. pack.pack(">Pb", topic, qos)
  55. end
  56. return pack.pack(">bAA", header, encodeLen(#data), data)
  57. end
  58. local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
  59. local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
  60. local len = 2 + #topic + #payload
  61. if qos > 0 then
  62. return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
  63. else
  64. return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
  65. end
  66. end
  67. local function packACK(id, dup, packetId)
  68. return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
  69. end
  70. local function packZeroData(id, dup, qos, retain)
  71. dup = dup or 0
  72. qos = qos or 0
  73. retain = retain or 0
  74. return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
  75. end
  76. local function unpack(s)
  77. if #s < 2 then return end
  78. log.debug("mqtt.unpack", #s, string.tohex(string.sub(s, 1, 50)))
  79. -- read remaining length
  80. local len = 0
  81. local multiplier = 1
  82. local pos = 2
  83. repeat
  84. if pos > #s then return end
  85. local digit = string.byte(s, pos)
  86. len = len + ((digit % 128) * multiplier)
  87. multiplier = multiplier * 128
  88. pos = pos + 1
  89. until digit < 128
  90. if #s < len + pos - 1 then return end
  91. local header = string.byte(s, 1)
  92. local packet = {id = header / 16, dup = (header % 16) / 8, qos = bit.band(header, 0x06) / 2}
  93. local nextpos
  94. if packet.id == CONNACK then
  95. nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
  96. elseif packet.id == PUBLISH then
  97. nextpos, packet.topic = pack.unpack(s, ">P", pos)
  98. if packet.qos > 0 then
  99. nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
  100. end
  101. packet.payload = string.sub(s, nextpos, pos + len - 1)
  102. elseif packet.id ~= PINGRESP then
  103. if len >= 2 then
  104. nextpos, packet.packetId = pack.unpack(s, ">H", pos)
  105. else
  106. packet.packetId = 0
  107. end
  108. end
  109. return packet, pos + len
  110. end
  111. local mqttc = {}
  112. mqttc.__index = mqttc
  113. --- mqtt.client() 创建mqtt client实例
  114. -- @param clientId
  115. -- @param keepAlive 心跳间隔,默认300秒
  116. -- @param username 用户名为空请输入nil
  117. -- @param password 密码为空请输入nil
  118. -- @param cleanSession 1/0
  119. -- @param will 遗嘱参数(Last Will/Testament), 若不打开LWT请不要传递该参数,传递参数默认为打开LWT,will.qos, will.retain, will.topic, will.payload
  120. -- @return result true - 成功,false - 失败
  121. -- @usage
  122. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  123. function client(clientId, keepAlive, username, password, cleanSession, will)
  124. local o = {}
  125. local packetId = 1
  126. if will then
  127. will.flag = 1
  128. else
  129. will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
  130. end
  131. o.clientId = clientId
  132. o.keepAlive = keepAlive or 300
  133. o.username = username or ""
  134. o.password = password or ""
  135. o.cleanSession = cleanSession or 1
  136. o.will = will
  137. o.commandTimeout = CLIENT_COMMAND_TIMEOUT
  138. o.cache = {}-- 接收到的mqtt数据包缓冲
  139. o.inbuf = "" -- 未完成的数据缓冲
  140. o.connected = false
  141. o.getNextPacketId = function()
  142. packetId = packetId == 65535 and 1 or (packetId + 1)
  143. return packetId
  144. end
  145. o.lastIOTime = 0
  146. setmetatable(o, mqttc)
  147. return o
  148. end
  149. -- 检测是否需要发送心跳包
  150. function mqttc:checkKeepAlive()
  151. if self.keepAlive == 0 then return true end
  152. if os.time() - self.lastIOTime >= self.keepAlive then
  153. if not self:write(packZeroData(PINGREQ)) then
  154. log.info("mqtt.client:checkKeepAlive", "pingreq send fail")
  155. return false
  156. end
  157. end
  158. return true
  159. end
  160. -- 发送mqtt数据
  161. function mqttc:write(data)
  162. log.debug("mqtt.client:write", string.tohex(string.sub(data, 1, 50)))
  163. local r = self.io:send(data)
  164. if r then self.lastIOTime = os.time() end
  165. return r
  166. end
  167. -- 接收mqtt数据包
  168. function mqttc:read(timeout)
  169. if not self:checkKeepAlive() then return false end
  170. -- 处理之前缓冲的数据
  171. local packet, nextpos = unpack(self.inbuf)
  172. if packet then
  173. self.inbuf = string.sub(self.inbuf, nextpos)
  174. return true, packet
  175. end
  176. while true do
  177. local recvTimeout
  178. if self.keepAlive == 0 then
  179. recvTimeout = timeout
  180. else
  181. local kaTimeout = (self.keepAlive - (os.time() - self.lastIOTime)) * 1000
  182. recvTimeout = kaTimeout > timeout and timeout or kaTimeout
  183. end
  184. local r, s = self.io:recv(recvTimeout == 0 and 5 or recvTimeout)
  185. if r then
  186. self.inbuf = self.inbuf .. s
  187. elseif s == "timeout" then -- 超时,判断是否需要发送心跳包
  188. if not self:checkKeepAlive() then
  189. return false
  190. elseif timeout <= recvTimeout then
  191. return false, "timeout"
  192. else
  193. timeout = timeout - recvTimeout
  194. end
  195. else -- 其他错误直接返回
  196. return r, s
  197. end
  198. local packet, nextpos = unpack(self.inbuf)
  199. if packet then
  200. self.lastIOTime = os.time()
  201. self.inbuf = string.sub(self.inbuf, nextpos)
  202. if packet.id ~= PINGRESP then
  203. return true, packet
  204. end
  205. end
  206. end
  207. end
  208. -- 等待接收指定的mqtt消息
  209. function mqttc:waitfor(id, timeout)
  210. for index, packet in ipairs(self.cache) do
  211. if packet.id == id then
  212. return true, table.remove(self.cache, index)
  213. end
  214. end
  215. while true do
  216. local r, data = self:read(timeout)
  217. if r then
  218. if data.id == PUBLISH then
  219. if data.qos > 0 then
  220. if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
  221. log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
  222. return false
  223. end
  224. end
  225. elseif data.id == PUBREC or data.id == PUBREL then
  226. if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
  227. log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
  228. return false
  229. end
  230. end
  231. if data.id == id then
  232. return true, data
  233. end
  234. table.insert(self.cache, data)
  235. else
  236. return false, data
  237. end
  238. end
  239. end
  240. --- 连接mqtt服务器
  241. -- @param host 地址
  242. -- @param port 端口
  243. -- @param transport tcp
  244. -- @return result true - 成功,false - 失败
  245. -- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp")
  246. function mqttc:connect(host, port, transport)
  247. if self.connected then
  248. log.info("mqtt.client:connect", "has connected")
  249. return false
  250. end
  251. if self.io then
  252. self.io:close()
  253. self.io = nil
  254. end
  255. if transport and transport ~= "tcp" then
  256. log.info("mqtt.client:connect", "invalid transport", transport)
  257. return false
  258. end
  259. self.io = socket.tcp()
  260. if not self.io:connect(host, port) then
  261. log.info("mqtt.client:connect", "connect host fail")
  262. return false
  263. end
  264. if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will)) then
  265. log.info("mqtt.client:connect", "send fail")
  266. return false
  267. end
  268. local r, packet = self:waitfor(CONNACK, self.commandTimeout)
  269. if not r or packet.rc ~= 0 then
  270. log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
  271. return false
  272. end
  273. self.connected = true
  274. return true
  275. end
  276. --- mqtt.client:subscribe(topic, qos)
  277. -- @param topic
  278. -- @param qos 0/1/2
  279. -- @return result true - 成功,false - 失败
  280. -- @usage
  281. -- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
  282. -- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
  283. function mqttc:subscribe(topic, qos)
  284. if not self.connected then
  285. log.info("mqtt.client:subscribe", "not connected")
  286. return false
  287. end
  288. local topics
  289. if type(topic) == "string" then
  290. topics = {[topic] = qos and qos or 0}
  291. else
  292. topics = topic
  293. end
  294. if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  295. log.info("mqtt.client:subscribe", "send failed")
  296. return false
  297. end
  298. if not self:waitfor(SUBACK, self.commandTimeout) then
  299. log.info("mqtt.client:subscribe", "wait ack failed")
  300. return false
  301. end
  302. return true
  303. end
  304. --- mqtt.client:publish(topic, payload, qos, retain)
  305. -- @param topic
  306. -- @param payload
  307. -- @param qos 0/1/2, default 0
  308. -- @param retain
  309. -- @return mid mqtt message id
  310. -- @usage
  311. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  312. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  313. -- mqttc:publish("/topic", "publish from luat mqtt client", 0)
  314. function mqttc:publish(topic, payload, qos, retain)
  315. if not self.connected then
  316. log.info("mqtt.client:publish", "not connected")
  317. return false
  318. end
  319. qos = qos or 0
  320. retain = retain or 0
  321. if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
  322. log.info("mqtt.client:publish", "socket send failed")
  323. return false
  324. end
  325. if qos == 0 then return true end
  326. if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout) then
  327. log.warn("mqtt.client:publish", "wait ack timeout")
  328. return false
  329. end
  330. return true
  331. end
  332. --- mqtt.client:receive([timeout])
  333. -- @return result true - 成功,false - 失败
  334. -- @return data 如果成功返回的时候接收到的服务器发过来的包,如果失败返回的是错误信息
  335. -- @usage
  336. -- true, packet = mqttc:receive()
  337. -- false, error_message = mqttc:receive()
  338. function mqttc:receive(timeout)
  339. if not self.connected then
  340. log.info("mqtt.client:receive", "not connected")
  341. return false
  342. end
  343. return self:waitfor(PUBLISH, timeout)
  344. end
  345. --- mqtt disconnect
  346. -- @return result true - 成功,false - 失败
  347. -- @usage
  348. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  349. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  350. -- process data
  351. -- mqttc:disconnect()
  352. function mqttc:disconnect()
  353. if self.connected then
  354. self:write(packZeroData(DISCONNECT))
  355. end
  356. if self.io then
  357. self.io:close()
  358. self.io = nil
  359. end
  360. self.cache = {}
  361. self.inbuf = ""
  362. self.connected = false
  363. end