Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

物联网宠儿mqtt.js那些事儿 #204

Open
FrankKai opened this issue Apr 8, 2020 · 10 comments
Open

物联网宠儿mqtt.js那些事儿 #204

FrankKai opened this issue Apr 8, 2020 · 10 comments

Comments

@FrankKai
Copy link
Owner

FrankKai commented Apr 8, 2020

image

常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。 前者很常见,属于微服务间的mq。那么MQTT是什么呢?MQTT属于IoT也就是物联网的概念。
快来和使用mqtt.js开发IM功能2年的作者一探究竟吧~

先来看下MQTT在物联网领域的应用场景:
image

mqtt.js是MQTT在nodejs端的实现。
通过npm package.json包管理,现代vue技术栈下的前端也可用,比如用vue-cli,create-react-app等等构建的项目。

mqtt.js官方为微信小程序和支付宝小程序也做了支持。微信小程序的MQTT协议名为wxs,支付宝小程序则是alis

如果还是一脸懵逼,那么就跟随我通过mqtt.js去认识一下这个物联网领域的宠儿吧。

  • 什么是微消息队列?
  • MQTT关键名词解释
  • P2P消息和Pub/Sub消息
  • 封装的mqtt.js通用class
  • 客户端发包函数sendPacket
  • 客户端连接 mqtt.connect()
  • 订阅topic mqtt.Client#subscribe()
  • 发送消息 mqtt.Client#publish()
  • 接收消息 mqtt.Client#“message”事件
@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

什么是微消息队列?

消息队列一般分为两种:

  • 微服务消息队列(微服务间信息传递,典型代表有RabbitMQ,Kafka,RocketMQ)
  • 物联网消息队列(物联网端与云端消息传递,代表有MQTT)

目前我实践过的,也就是我们本篇博文深入分析的,是物联网消息队列的mqtt.js。

传统的消息队列(微服务间信息传递)

传统的微服务间(多个子系统服务端间)消息队列是一种非常常见的服务端间消息传递的方式。

典型代表有:RabbitMQ,Kafka,RocketMQ。
阿里云官网拥有AMQP(兼容RabbitMQ),Kafka,和RocketMQ这三种微服务消息队列,对于我们今后在实际项目中落地提供了很大的帮助。

更多微服务消息队列可查看:node-mq-tutorial

使用场景多种多样:

  • 高并发:秒杀、抢票(FIFO)
  • 共享型:积分兑换(多子系统共用积分模块)
  • 通信型:服务端间消息传递(nodejs,java,python,go等等)

MQTT消息队列(物联网端与云间消息传递)

MQTT是一个物联网MQTT协议,主要解决的是物联网IoT网络情况复杂的问题。

阿里云有MQTT消息队列服务。通信协议支持MQTT,STOMP,GB-808等。数据传输层支持TCP长连接、SSL加密、Websocket等。

使用场景主要为数据传输:

  1. 车联网(远程控制,汽车数据上传)
  2. IM通讯(1对1单聊,1对多朋友圈)
  3. 视频直播(弹幕通知,聊天互动)
  4. 智能家居(电器数据上传,遥控指令)

目前我手上负责的运行了2年的聊天系统就是使用的这个服务,我们主要按照设备<->server<->PC的方式,MQTT协议,Websocket传输协议进行设备与PC间的数据通信。

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

MQTT关键名词解释

实例(Instance)

每个MQTT实例都对应一个全局唯一的服务接入点。
肉眼可见的区别就是在通过mqtt.connect(url)与server(broker)建立连接时,broker的url都是一致的。
假设有saleman1,salesman2···他们本地的前端与服务端间建立连接的url都是统一的,只是在clientId进行区分即可。

客户端Id(Client ID)

MQTT的Client ID是每个客户端的唯一标识,要求全局都是唯一的,使用同一个Client ID连接会被拒绝。
阿里云的ClientID由两部分组成 <GroupID>@@@<DeviceID>
通常情况下Group ID是多前端统一的,比如PC端,安卓移动端,ios移动端,DeviceID也是多前端统一的。
那么如何区分多端呢?可以对Client ID中间的@@@做修改。
比如:

let CID_PC = `<GroupID>@@@-PC<DeviceID>`
let CID_Android = `<GroupID>@@@-Android<DeviceID>`
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`

组Id(Group ID)

用于指定一组逻辑功能完全一致的节点公用的组名,代表的是一类相同功能的设备。

Device ID

每个设备独一无二的标识。这个需要保证全局唯一,可以是每个传感器设备的序列号,可以是登录PC的userId。

父主题(Parent Topic)

MQTT协议基于Pub/Sub模型,任何消息都属于一个Topic。
Topic可以存在多级,第一级为父级Topic。
需要控制台单独创建。

子主题(Subtopic)

MQTT可以有二级Topic,也可以有三级Topic。
无需创建,代码中直接写即可。

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

P2P消息和Pub/Sub消息

Pub/Sub消息就是订阅和发布的模式,类似事件监听和广播。
如果对发布订阅不理解,可以去看Webhook到底是个啥?
MQTT除了支持Pub/Sub的模式,还支持P2P的模式。

什么是P2P消息?

  • P2P,全称为(Point to Point)。
  • 一对一的消息收发模式,只有一个消息发送者和一个消息接收者。
  • P2P模式下,消息发送者明确知道消息的预期接收者,并且这个消息只能被这个特定的客户端消费
  • 发送者发送消息时,通过Topic指定接收者,接收者无需订阅即可获得该消息。
  • P2P 模式不仅降低注册订阅的成本,而且因为对链路有优化,所以降低推送延迟。

P2P模式和Pub/Sub模式的区别

发送消息时

  • Pub/Sub模式下,发送者需要按照与接受者约定好的Topic发送消息
  • P2P模式下,发送者无需按照Tpic发送,可以直接按照规范进行发送

接收消息时

  • Pub/Sub模式下,接收者需要提前订阅topic才能接消息
  • P2P模式下无需订阅即可接收消息

nodejs发送P2P消息

const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";
mqtt.client.publish(p2pTopic);

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

封装的mqtt.js通用class

  • 客户端连接 initClient(config)
  • 订阅topic subscribeTopic(topic, config)
  • 发送消息 publishMessage(message)
  • 接收消息 handleMessage(callback)
import mqtt from 'mqtt';
import config from '@/config';

export default class MQTT {
  constructor(options) {
    this.name = options.name;
    this.connecting = false;
  }
  /**
   * 客户端连接
   */
  initClient(config) {
    const { url, groupId, key, password, topic: { publish: publishTopic }} = config;
    return new Promise((resolve) => {
      this.client = mqtt.connect(
        {
          url,
          clientId: `${groupId}@@@${deviceId}`,
          username: key,
          password,
        }
      );
      this.client.on('connect', () => {
        this.connecting = true;
        resolve(this);
      });
    });
  }

  /**
   * 订阅topic
   */
  subscribeTopic(topic, config) {
    if (this.connecting) {
      this.client.subscribe(topic, config);
    }
    return this;
  }

  /**
   * 发送消息
   */
  publishMessage(message) {
    this.client.publish(publishTopic, message, { qos: 1 });
  }

  /**
   * 接收消息
   */
  handleMessage(callback) {
    if (!this.client._events.message) {
      this.client.on('message', callback);
    }
  }

}

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

客户端发包函数sendPacket

mqtt-packet生成一个可传输buffer

var mqtt = require('mqtt-packet')
var object = {
  cmd: 'publish',
  retain: false,
  qos: 0,
  dup: false,
  length: 10,
  topic: 'test',
  payload: 'test' // Can also be a Buffer
}
var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packet

console.log(mqtt.generate(object))
// Prints:
//
// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>
//
// Which is the same as:
//
// new Buffer([
//   48, 10, // Header (publish)
//   0, 4, // Topic length
//   116, 101, 115, 116, // Topic (test)
//   116, 101, 115, 116 // Payload (test)
// ])

sendPacket函数

发出packetsend事件并且通过mqtt.writeToStream将packet写入client的stream中。

var mqttPacket = require('mqtt-packet')

function sendPacket (client, packet) {
  client.emit('packetsend', packet)
  mqttPacket.writeToStream(packet, client.stream, client.options)
}

_sendPack方法

MqttClient.prototype._sendPacket = function (packet) {
     sendPacket(this, packet);
}

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

客户端连接 mqtt.connect()

mqtt client建立与mqtt server(broker)的连接,通常是通过给定一个'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'为协议的url进行连接。

mqtt.connect([url], options)

官方说明:

  • 通过给定的url和配置连接到一个broker,并且返回一个Client。
  • url可以遵循以下协议:'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'。(mqtt.js支持微信小程序和支付宝小程序,协议分别为wxs和alis。
  • url也可以是通过URL.parse()返回的对象。
  • 可以传入一个单对象,既包含url又包含选项。

再来看一下我手上项目的连接配置,连接结果。
敏感信息已通过foo,bar,baz或者xxxx的组合进行数据脱敏处理。

连接配置

 {
    key: 'xxxxxxxx',
    secret: 'xxxxxxxx',
    url: 'wss://foo-bar.mqtt.baz.com/mqtt',
    groupId: 'FOO_BAR_BAZ_GID',
    topic: {
      publish: 'PUBLISH_TOPIC',
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
      unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
    },
}
  • key 账号
  • secret 密码
  • url 用于建立client与server(broker)mqtt连接的链接
  • groupId 组id
  • topic 发送消息的topic,订阅的topic,取消订阅的topic

连接结果

包括总览,响应头和请求头。

General
Request URL: wss://foo-bar.mqtt.baz.com
Request Method: GET
Status Code: 101 Switching Protocols
Response Header
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: xxxxxxx
sec-websocket-protocol: mqtt
Request Header
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1
Host: foo-bar.mqtt.baz.com
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36
Upgrade: websocket
Origin: https://xxx.xxx.com
Sec-WebSocket-Version: 13
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6
Sec-WebSocket-Key: xxxxxxxxx
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Protocol: mqtt
源码分析

下面来看这段mqtt连接的代码。

this.client = mqtt.connect(
  {
    url,
    clientId: `${groupId}@@@${deviceId}`,
    username: key,
    password,
  }
);
function parseAuthOptions (opts) {
  var matches
  if (opts.auth) {
    matches = opts.auth.match(/^(.+):(.+)$/)
    if (matches) {
      opts.username = matches[1]
      opts.password = matches[2]
    } else {
      opts.username = opts.auth
    }
  }
}
/**
 * connect - connect to an MQTT broker.
 *
 * @param {String} [brokerUrl] - url of the broker, optional
 * @param {Object} opts - see MqttClient#constructor
 */
function connect (brokerUrl, opts) {
  if ((typeof brokerUrl === 'object') && !opts) {
    //  可以传入一个单对象,既包含url又包含选项
    opts = brokerUrl
    brokerUrl = null
  }
  opts = opts || {}
  // 设置username和password
  parseAuthOptions(opts)
  if (opts.query && typeof opts.query.clientId === 'string') {
    // 设置Client Id
    opts.clientId = opts.query.clientId
  }
  function wrapper (client) {
   ...
    return protocols[opts.protocol](client, opts)
  }
  // 最终返回一个mqtt client实例
  return new MqttClient(wrapper, opts)
}

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

订阅topic mqtt.Client#subscribe()

实际代码

const topic =  {
      subscribe: ['SUBSCRIBE_TOPIC/noticePC/', 'SUBSCRIBE_TOPIC/p2p'],
      unsubscribe: 'SUBSCRIBE_TOPIC/noticeMobile/',
};
const config = { qos:1 };
this.client.subscribe(topic.subscribe, config)

源码分析

MqttClient.prototype.subscribe = function () {
  var packet
  var args = new Array(arguments.length)
  for (var i = 0; i < arguments.length; i++) {
    args[i] = arguments[i]
  }
  var subs = []
   // obj为订阅的topic列表
  var obj = args.shift()
  // qos等配置
  var opts = args.pop()
  var defaultOpts = {
    qos: 0
  }
  opts = xtend(defaultOpts, opts)
  // 数组类型的订阅的topic列表  
  if (Array.isArray(obj)) {
    obj.forEach(function (topic) {
      if (!that._resubscribeTopics.hasOwnProperty(topic) ||
        that._resubscribeTopics[topic].qos < opts.qos ||
          resubscribe) {
        var currentOpts = {
          topic: topic,
          qos: opts.qos
        }
        // subs是最终的订阅的topic列表
        subs.push(currentOpts)
      }
    })
  }
  // 这个packet很重要
  packet = {
    // 发出订阅命令
    cmd: 'subscribe',
    subscriptions: subs,
    qos: 1,
    retain: false,
    dup: false,
    messageId: this._nextId()
  }
  // 发出订阅包
  this._sendPacket(packet)
  return this
}

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

发送消息 mqtt.Client#publish()

实际代码

const topic = {
      publish: 'PUBLISH_TOPIC',
};
const messge = {
   foo: '',
   bar: '',
   baz: '',
   ...
}
const msgStr = JSON.stringify(message);
this.client.publish(topic.publish, msgStr);

注意publish的消息需要使用JSON.stringify进行序列化,然后再发到指定的topic。

源码分析

MqttClient.prototype.publish = function (topic, message, opts, callback) {
  var packet
  var options = this.options
  var defaultOpts = {qos: 0, retain: false, dup: false}
  opts = xtend(defaultOpts, opts)

  // 将消息传入packet的payload
  packet = {
    cmd: 'publish',
    topic: topic,
    payload: message,
    qos: opts.qos,
    retain: opts.retain,
    messageId: this._nextId(),
    dup: opts.dup
  }
  // 处理不同qos
  switch (opts.qos) {
    case 1:
    case 2:
       // 发出publish packet
       this._sendPacketI(packet);
        ...
    default:
       this._sendPacket(packet);
        ...
  }
  return this
}

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 8, 2020

接收消息 mqtt.Client “message”事件

实际代码

this.client.on('message', callback);

数据以callback的方式接收。

function (topic, message, packet) {}

topic代表接收到的topic,buffer则是具体的数据。
message是接收到的数据,谨记通过JSON.parse()对buffer做解析。

handleMessage(callback) {
    this.client.on('message', callback);
}
this.client.handleMessage((topic, buffer) => {
  let receiveMsg = null;
  try {
   receiveMsg = JSON.parse(buffer.toString());
  } catch (e) {
   receiveMsg = null;
  }
  if (!receiveMsg) {
    return;
  }
  ...do something with receiveMsg...
});

源码分析

MqttClient使用inherits包继承了EventEmitter。
从而进行可以使用on监听“message”事件。

inherits(MqttClient, EventEmitter)

那么到底是在哪里间发出message事件的呢?>emit the message event

  1. 基于websocket-stream建立websocket连接
  2. 使用pipe连接基于readable-stream.Writable创建的可写流
  3. nextTick调用_handlePacket
  4. 在handlePacket中调用handlePublish发出message事件
1.基于websocket-stream建立websocket连接
this.stream = this.streamBuilder(this)
function streamBuilder (client, opts) {
  return createWebSocket(client, opts)
}
var websocket = require('websocket-stream')
function createWebSocket (client, opts) {
  var websocketSubProtocol =
    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
      ? 'mqttv3.1'
      : 'mqtt'

  setDefaultOpts(opts)
  var url = buildUrl(opts, client)
  return websocket(url, [websocketSubProtocol], opts.wsOptions)
}
2. 使用pipe连接基于readable-stream.Writable创建的可写流
var Writable = require('readable-stream').Writable
var writable = new Writable()
this.stream.pipe(writable)
3.nextTick调用_handlePacket
writable._write = function (buf, enc, done) {
    completeParse = done
    parser.parse(buf)
    work()
}
function work () {
    var packet = packets.shift()
    if (packet) {
      that._handlePacket(packet, nextTickWork)
    }
}
function nextTickWork () {
    if (packets.length) {
      process.nextTick(work)
    } else {
      var done = completeParse
      completeParse = null
      done()
    }
}
4. 在handlePacket中调用handlePublish发出message事件
MqttClient.prototype._handlePacket = function (packet, done) {
  switch (packet.cmd) {
    case 'publish':
      this._handlePublish(packet, done)
      break
   ...
}
// emit the message event
MqttClient.prototype._handlePublish = function (packet, done) {
  switch (qos) {
    case 1: {
      // emit the message event
        if (!code) { that.emit('message', topic, message, packet) }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant