We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
本文原创,著作权归WGrape所有,未经授权,严禁转载
消息由业务方产生,由于不同的Topic代表着不同的路由策略,所以Topic一般也用于区分不同的业务,业务方只需要关心消息所属的Topic
业务方通过调用nsqd提供的HTTP接口(/pub?topic=name&defer=10),实现消息的发布,这时会向nsqd传输一个具有固定格式的数据结构
00000140qwertyuiopasdfgh{"topic":"test","classname":"UserService","methodname":"addUser","param":["userid", "username", "password"],"addtime":"2020-11-27 14:30:34"}
NSQ内部的Http模块当监听到请求/pub接口时,会调用doPUB方法,由doPUB方法实现消息的接收
doPUB方法内部主要做以下事情 (1) 消息内容的检查,主要是长度的检查、参数的检查(如defer) (2) 从请求体中获取消息所属的Topic,并创建一个Topic对象 (3) 由Topic对象实现消息的发布
在Topic对象的PutMessage方法中,主要会进行一次锁处理,以保证消息发布的线程安全
消息的Pub最终由Topic的put方法完成,并把消息送入memoryMsgChan管道中,初始状态下,由于没有订阅此Topic的消费者客户端, 所以消息会在管道中排队,如果管道已满,会把消息写入后备队列中。
writeMessageToBackend是一个抽象的后备队列写入接口,以实现不同的后备队列,如磁盘
在NSQProxy中,startConsume()方法实现对Topic的订阅
在startConsume方法中,核心需要完成两个工作 : 1、创建消费者客户端实例,包括消费配置、消息回调等 2、连接至nsqlookupd,实现消费者客户端与nsqd的动态连接
在nsqd启动成功后,会执行IOLoop,IOLoop会监听Topic对应的管道,当管道中有消息到达时,接收消息并传输给消费者客户端
当消费者客户端接收到消息后,会执行HandleMessage方法,并通过Socket把消息继续向下传递给对应消费机上的MeepoPS进程
在Worker机器上运行的MeepoPS进程,接收到消息后,会先检查数据包的长度,当数据包完全接收完后,会进行一个简单的解码操作,把消息头部的8字节数据去掉,并调用callbackNewData方法,把去掉头部数据的消息传递过去,最终实现对消息的消费
nsqd提供消息发布的功能,可以通过HTTP接口实现,也可以通过Socket通信,传输PUB指令实现。
消息存储在Channel中,如果Channel已满,会把消息存储在后备队列中。其中对Channel和后备队列的存取的切换,由nsqd底层实现,使用无感知
消费者客户端通过注册中心(nsqlookupd)实现与nsqd的动态绑定后,nsqd内部会记录Topic、Channel、Consumer的拓扑结构,其中Topic与Channel是一对多(Broadcast)的关系,Channel和Consumer也是一对多(Load Balance)的关系。
在启动NSQProxy后,会从数据库中获取所有Topic、Channel与Worker的对应关系,依次创建消费者客户端,并通过连接至nsqlookupd,实现消息从NSQ到NSQProxy的转发
The text was updated successfully, but these errors were encountered:
No branches or pull requests
前言
本文原创,著作权归WGrape所有,未经授权,严禁转载
一、架构
二、消息的生命周期
1、产生
消息由业务方产生,由于不同的Topic代表着不同的路由策略,所以Topic一般也用于区分不同的业务,业务方只需要关心消息所属的Topic
2、发布
业务方通过调用nsqd提供的HTTP接口(/pub?topic=name&defer=10),实现消息的发布,这时会向nsqd传输一个具有固定格式的数据结构
3、接收
NSQ内部的Http模块当监听到请求/pub接口时,会调用doPUB方法,由doPUB方法实现消息的接收
doPUB方法内部主要做以下事情
(1) 消息内容的检查,主要是长度的检查、参数的检查(如defer)
(2) 从请求体中获取消息所属的Topic,并创建一个Topic对象
(3) 由Topic对象实现消息的发布
在Topic对象的PutMessage方法中,主要会进行一次锁处理,以保证消息发布的线程安全
消息的Pub最终由Topic的put方法完成,并把消息送入memoryMsgChan管道中,初始状态下,由于没有订阅此Topic的消费者客户端,
所以消息会在管道中排队,如果管道已满,会把消息写入后备队列中。
4、订阅
在NSQProxy中,startConsume()方法实现对Topic的订阅
在startConsume方法中,核心需要完成两个工作 :
1、创建消费者客户端实例,包括消费配置、消息回调等
2、连接至nsqlookupd,实现消费者客户端与nsqd的动态连接
5、传输
在nsqd启动成功后,会执行IOLoop,IOLoop会监听Topic对应的管道,当管道中有消息到达时,接收消息并传输给消费者客户端
5、回调
当消费者客户端接收到消息后,会执行HandleMessage方法,并通过Socket把消息继续向下传递给对应消费机上的MeepoPS进程
6、消费
在Worker机器上运行的MeepoPS进程,接收到消息后,会先检查数据包的长度,当数据包完全接收完后,会进行一个简单的解码操作,把消息头部的8字节数据去掉,并调用callbackNewData方法,把去掉头部数据的消息传递过去,最终实现对消息的消费
三、核心原理
1、消息的发布
nsqd提供消息发布的功能,可以通过HTTP接口实现,也可以通过Socket通信,传输PUB指令实现。
2、消息的存储
消息存储在Channel中,如果Channel已满,会把消息存储在后备队列中。其中对Channel和后备队列的存取的切换,由nsqd底层实现,使用无感知
3、消息的拓扑结构
消费者客户端通过注册中心(nsqlookupd)实现与nsqd的动态绑定后,nsqd内部会记录Topic、Channel、Consumer的拓扑结构,其中Topic与Channel是一对多(Broadcast)的关系,Channel和Consumer也是一对多(Load Balance)的关系。
4、NSQProxy工作原理
在启动NSQProxy后,会从数据库中获取所有Topic、Channel与Worker的对应关系,依次创建消费者客户端,并通过连接至nsqlookupd,实现消息从NSQ到NSQProxy的转发
The text was updated successfully, but these errors were encountered: