You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
首先我们看一下Producer的继承结构:
MQAdmin主要包含一些管理性的接口,比如创建topic、查询某个特定消息以方便排查问题,ClientConfig主要定义了一些基本的配置,比如持久化consumer端消费offset的间隔时间(offset就是consumer端当前消费到的位置,offset的持久化机制也决定了是exactly once 还是根据时间戳等消费),然后再来看DefaultMQProducer,我们发现它将具体的实现都代理给了DefaultMQProducerImpl去做,这个类主要包含了不同的send方法,同步、异步、oneway(发出去就不管了,比如可以用于日志同步)。
本文主要讲解一下阿里巴巴开源的消息队列中间件RocketMQ的producer客户端的发送流程,并简单与Kafka的实现方式做一些对比,希望能够对如何实现一个高性能网络客户端有个大致的了解。
正文
首先我们看一下Producer的继承结构:
MQAdmin主要包含一些管理性的接口,比如创建topic、查询某个特定消息以方便排查问题,ClientConfig主要定义了一些基本的配置,比如持久化consumer端消费offset的间隔时间(offset就是consumer端当前消费到的位置,offset的持久化机制也决定了是exactly once 还是根据时间戳等消费),然后再来看DefaultMQProducer,我们发现它将具体的实现都代理给了DefaultMQProducerImpl去做,这个类主要包含了不同的send方法,同步、异步、oneway(发出去就不管了,比如可以用于日志同步)。
通过这几个接口我们也能了解到其功能,下面来分析一下其具体实现。首先我们看一下它的主要字段以及其含义:
至于Producer的启动流程,可以参考之前的博客,这里主要来看一下核心的发送流程:
简单总结一下上面这坨代码:
这里最后一步发送会委托给MQClientAPIImpl去做,下面我们分析一下这个类的代码:
这里又会去调用
remotingClient
,这里主要是网络IO相关的逻辑,RocketMQ采用了Netty去实现:这里可以看到有几个并发的小技巧,比如RocketMQ里的ResponseFuture#executeInvokeCallback方法,通过AtomicBoolean实现方法执行exactly once语义:
另外再举一个例子,比如CountDownLatch的使用,如果指定为1,那么就可以用于实现一些并发下创建连接等操作,比如线程A创建连接,线程B发现A已经在尝试建立连接了,那么B就可以阻塞在这里,等A完成后,再继续后续操作,这里依然拿ResponseFuture举例子:
总结
相对来说,RocketMQ发送端这里逻辑还是比较简单的,提交一条消息后就通过Netty发送到Broker,而Kafa的会更复杂一点,Kafka这里会做一个合并,客户端提交是放到一个内存队列,然后有一个Sender线程负责根据当前的状态决定是否发送消息,这里还有一个队列会存储所有的回调,当执行完成后统计执行callback,后面有时间再写一篇博客详细分析一下Kafka的发送流程。
The text was updated successfully, but these errors were encountered: