#Spring AMQP
官方定义: The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. In all of these cases,you will see similarities to the JMS support in the Spring Framework.
#特性
The project consists of two parts;
- spring-amqp is the base abstraction,
- spring-rabbit is the RabbitMQ implementation.
- Listener container for asynchronous processing of inbound messages
- RabbitTemplate for sending and receiving messages
- RabbitAdmin for automatically declaring queues, exchanges and bindings
#什么是RabbitMQ
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统, 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:
##decoupling(单项解耦)
一个生产者,一个消费者
producer -->RabbitMQ--> consumer
##bidrectional decoupling(双向解耦)
一个生产者,多个消费者
/consumer
producer -->RabbitMQ--> \consumer
##Consumer(消费者)
##exchange(交换机)
1.接收消息,转发消息到绑定的队列。四种类型:
- direct:转发消息到routingKey指定的队列
- topic:按规则转发消息(最灵活)
- headers:(这个还没有接触到)
- fanout:转发消息到所有绑定队列
-
如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。
-
一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。
-
topic类型交换器通过模式匹配分析消息的routing-key属性。 它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。 它同样也会识别两个通配符:#匹配0个或者多个单词,匹配一个单词。 例如,binding key:.stock.#匹配routing key:usd.stcok和eur.stock.db,但是不匹配stock.nana。 还有一些其他的交换器类型,如header、failover、system等,现在在当前的RabbitMQ版本中均未实现。
-
因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。
-
交换器的属性:
-
持久性:如果启用,交换器将会在server重启前都有效。
-
自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
-
惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
##队列(queue)
-
队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。
-
临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。
-
队列的属性:
-
持久性:如果启用,队列将会在server重启前都有效。
-
自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
-
惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
-
排他性:如果启用,队列只能被声明它的消费者使用。
这些性质可以用来创建例如排他和自删除的transient或者私有队列。 这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉。 它们只是短暂地连接到server,但是可以用于实现例如RPC或者在AMQ上的对等通信。
- RPC的使用是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID),并且是自删除和排他的。
然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。 RPC服务器将会回答这些请求,使用消息的reply-to作为routing key(默认绑定器会绑定所有的队列到默认交换器, 名称为“amp.交换器类型名”)发送到默认交换器。 注意这仅仅是惯例而已,可以根据和RPC服务器的约定,它可以解释消息的任何属性(甚至数据体)来决定回复给谁。
##消息传递:
-
消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。
-
为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。
channel.basic_qos(prefetch_count=1)
注意:要防止如果所有的消费者都在处理中,则队列中的消息会累积的情况。
- 消息有14个属性,最常用的几种:
deliveryMode:持久化属性
contentType:编码
replyTo:指定一个回调队列
correlationId:消息id
实例代码:
-
消息生产者可以选择是否在消息被发送到交换器并且还未投递到队列(没有绑定器存在) 和/或没有消费者能够立即处理的时候得到通知。 通过设置消息的mandatory和/或immediate属性为真, 这些投递保障机制的能力得到了强化。
-
此外,一个生产者可以设置消息的persistent属性为真。 这样一来,server将会尝试将这些消息存储在一个稳定的位置,直到server崩溃。 当然,这些消息肯定不会被投递到非持久的队列中。
##高可用性(HA):
- 消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。 如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念), 则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
channel.basicConsume(queuename, noAck=false, consumer);
- 消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?)
channel.queueDeclare(queuename, durable=true, false, false, null);
发送消息时可以指定消息持久化属性:
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
这样,即使RabbitMQ服务器重启,也不会丢失队列和消息。
-
publisher confirms
-
master/slave机制,配合Mirrored Queue,这种情况下,publisher会正常发送消息和接收消息的confirm, 但对于subscriber来说,需要接收Consumer Cancellation Notifications来得到主节点失败的通知, 然后re-consume from the queue,此时要求client有处理重复消息的能力。 注意:如果queue在一个新加入的节点上增加了一个slave, 此时slave上没有此前queue的信息(目前还没有同步机制)。
(通过命令行或管理插件可以查看哪个slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。
##集群(cluster):
-
不支持跨网段(如需支持,需要shovel或federation插件)
-
可以随意的动态增加或减少、启动或停止节点,允许节点故障
-
集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
-
集群的配置可以通过命令行,也可以通过配置文件,命令行优先。
#RabbitMQ 安装使用 ##windows下安装
otp_win64_18.0.exe
rabbitmq-server-3.5.3.exe
需要配置ERLANG_HOME
启动管理端:rabbitmq-plugins enable rabbitmq_management
管理访问地址:http://localhost:15672/#/
默认用户为:guest,密码为:guest
##使用docker 容器安装rabbit(暂不行)
-h, --hostname= Container host name 指定容器主机名
--name= Assign a name to the container 为容器分配一个名称
-d, --detach=false Run container in background and print container ID
‘启动容器’
[root@localhost ~]# docker run -d --hostname heidsoft-rabbit --name l2cloud-rabbit docker.io/rabbitmq
8f33ff656854627ae2279e8b050d0e7298043c60aeb30d232a840c59d83a7099
‘查看容器日志’
[root@localhost ~]# docker logs l2cloud-rabbit
RabbitMQ 3.5.3. Copyright (C) 2007-2014 GoPivotal, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: tty
###### ## tty
##########
Starting broker...
=INFO REPORT==== 16-Jul-2015::06:00:22 ===
Starting RabbitMQ 3.5.3 on Erlang 17.5.3
Copyright (C) 2007-2014 GoPivotal, Inc.
Licensed under the MPL. See http://www.rabbitmq.com/
=INFO REPORT==== 16-Jul-2015::06:00:22 ===
node : rabbit@heidsoft-rabbit
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash : ghIzfU22acv7t0IZT1ijqA==
log : tty
sasl log : tty
database dir : /var/lib/rabbitmq/mnesia/rabbit@heidsoft-rabbit
##Linux(centos7)上安装rabbit
curl --user admin:Qv5ZUk55GfjR http://127.0.0.1:15672/api/vhosts
yum install erlang xmlto
curl --user admin:Qv5ZUk55GfjR http://<host>:<port>/api/vhosts
docker ps -a | gawk '{print $1}'
[root@localhost ~]# docker run -d --hostname heidsoft-rabbit --name l2cloud-rabbit -p 15672:15672 5672:5672 docker.io/rabbitmq
rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.5.3-1.noarch.rpm