RabbitMQ 学习随笔
| 本文总阅读量次1. 关于RabbitMQ的Connection
Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享。某些情况下Channel的操作可以并发运行,但是在其他情况下会导致网络上出现错误的通信帧交错,同时也会影响发送方确认(publisher confirm)机制的运行,所以多线程间共享Channel是非线程安全的。
应用程序应该为每一个线程开辟一个Channel。
2. 慎用方法
- isOpen,这个方法返回值依赖shutdownCause,会引起锁竞争。
3. 交换器声明参数说明
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> args) throws IOException;
- exchange: 交换器名称。
- type: 交换器名称,四种: direct、fanout、topic、headers。
- durable: 设置是否持久化。durable设置为true表示持久化,反之是非持久化。
- autoDelete: 设置是否自动删除,设置为true表示自动删除,自动删除的前提是: 至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
- internal: 设置是否是内置的,设置为true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到这个交换器这种方式。
- argument: 其他的一些结构化参数。alternate-exchange。
其他重载方法参数说明:
exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> args) throws IOException;
nowait 参数指AMQP中定义队列不需要服务器返回,返回值为void。不建议使用
exechangeDeclarePassive(String name)
这个方法主要用来检测交换器是否存在,存在正常返回,不存在抛出异常。
ifUnused: 设置是否在交换器没有被使用情况下删除。否则任何情况下都可以删除。
4. 队列参数说明
queueDeclare(String queue, boolean durable, boolean execlusive, boolean autoDelete, Map<String, Object arguments) throws IOException;
- queue: 队列名称。
- durable: 设置是否持久化。为true则设置队列持久化。
- exclusive: 设置是否排他。如果设置为排他,则仅对首次声明它的连接可见,并在连接断开时(即使设置了持久化)自动删除。排他队列是基于连接(Connection)可见的,同一个连接断开时自动删除。排他是基于连接的,同一个连接的不同信道是可以同时访问同一连接创建的排他队列。
- autoDelete: 设置是否自动删除,自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
- arguments: 设置队列的其他参数。x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。
消费者在信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,将信道设置为“传输”。
ifEmpty: 设置为true,表示在队列为空的情况下才能删除。
5. 发送消息参数说明
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- exchange: 交换器名称,如果设置为空,则消息会发送到默认交换器中。
- routingKey: 路由健, 交换器根据路由健将消息存储到相应的队列中。
- props: 消息的基本属性集: 其中包含属性成员,分别有contentType、contentEncoding、headers(Map<String, Object>)、deliverMode、priority、correlationId、replyTo、expriation、messageId、timestamp、type、userId、appId、clusterId。
- byte[] body: 消息体。
- mandatory: 设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.return命令将消息返回给生产者。否则消息直接被丢弃。
- immediate: 设置为true时, 如果交换器在将消息路由到队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
6. 使用mandatory参数时如何获取没有被正确路由到合适队列的消息
可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。
7. 消费消息参数说明
String basicConsume(String queue, boolean actoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
- queue: 队列的名称。
- autoAck: 设置是否自动确认。
- consumerTag: 消费者标签,用来区分多个消费者。
- noLocal: 设置true表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者。
- exclusive: 设置是否排他。只允许当前连接消费。
- argument: 设置消费者的其他参数。
- callback: 设置消费者的回调函数。
8. 确认消息参数说明
void basicAck(long deliveryTag, boolean multiple) throws IOException;
- deliveryTag: 当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver方法向消费者推送消息,这个方法携带了一个delivery tag,它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel(待验证)。delivery tag是单调递增的正整数,客户端获取投递的方法用dellivery tag作为一个参数。
- mutiple: ack unack 的mutiple参数设置为true,则可以一次性应答delivery_tag小于等于传入值的所有应答。
9. 备份交换器
如果不设置mandatory参数,那么在消息在未被路由的情况下将会丢失,设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者代码将变的复杂。
如果不想采取上述措施取确保消息发送成功,可以通过声明备份交换器,声明交换器时使用alternate-exchange参数实现,也可以通过Policy方式实现。
备份交换器如下特殊情况:
- 如果设置的备份交换器不存在,则不会有异常出现,消息丢失。
- 如果设置的备份交换器未绑定任何队列,则不会有异常出现,消息丢失。
- 如果设置的备份交换器未匹配任何队列,则不会有异常出现,消息丢失。
- 如果备份交换器和mandatory一起使用,则mandatory失效。
10. 消息过期时间
目前有两种方式设置消息的过期时间:
通过队列属性设置,所有消息都有相同的过期时间。设置队列属性x-message-ttl或者通过Policy设置。也可以通过HTTP API。消息过期会被立即删除。
通过对消息本身单独设置。在basicPulish方法中添加expiration的属性参数。也可以通过HTTP API。消息过期不会被立即删除,只有当消费者消费消息时判定消息是否过期。
如果两种方式一块使用,则以过期时间较小的为准。
消息一旦到达过期时间,就会变成死信。
如果不设置ttl,则消息不会过期;如果ttl设置为0,此时除非将消息直接投递给消费者,否则该消息会被立即丢弃。
设置消息队列x-expires参数可以控制自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。
11. 死信队列、死信交换器(Dead-Letter-Exchange)
当一个消息在一个队列中变成死信时,它可以被重新发送到另一个交换器(DLX)中,绑定DLX的就被称为私信队列。
12. 延迟队列
延迟队列存储的对象时对应的延迟消息。延迟消息指消息被发送后(生产者?),并不想让消费者立即拿到这个消息,而是等待特定时间后,消费者才能拿这个消息消费。
13. 优先级队列
具有高优先级的队列具有高的被优先消费的特权。设置x-max-priority参数实现。
14. RPC
时序图:
流程图:
15. 持久化
RabbitMQ持久化分为3个部分:交换器、队列、消息持久化。
交换器和队列通过durable设置持久化;消息持久化通过投递模式(deliveryMode)设置为2实现。
注意:持久化消息正确存入RabbitMQ时,还需要一定时间才能存入磁盘。为了解决这个问题,可以引入RabbitMQ的队列镜像机制,如果主节点挂掉,可以自动切换到从节点,这样就保证了队列的可靠性。
16. 生产者确认
为了保证消息正确存入RabbitMQ,有两种解决方式:
- 事务方式: 虽然保证了数据的正确性,但是损耗RabbitMQ性能非常大(每次多一次命令帧的交互)。
- 通过发送方确认实现: 消息被发送到匹配队列后,RabbitMQ就会发送一个Basic.Ack给生产者。如果消息和队列是可持久化的,那么消息被写入磁盘后,才会发出ack确认。
发送方确认有4种方式: 事务和普通confirm,批量和异步confirm。
批量confirm: 发送一批数据后,调用waitForConfirms方法,等待服务器确认返回。缺点是,如果存在一条NACK,则全部需要重新发送,容易造成重复数据。
异步confirm: 提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法处理。
事务机制和发送确认机制是互斥的。
事务机制和发送确认机制确保消息能够正确的发至RabbitMQ,这里的发至指正确的发往至RabbitMQ的交换器,如果此时交换器没有绑定队列,消息丢失。
17. 消费消息
消费消息需要注意:
- 消息分发: 轮询的方式。
- 消息的顺序性: 消费者消息的消息和发布消息的顺序是一致的。RabbitMQ的顺序保障是从消息存入队列开始的。
- 弃用QueueingConsumer
basicQos(int prefetchCount, int PrefetchSize, boolean global)
参数说明:
- prefetchCount: 设置为0表示消息数量没有上限。
- prefetchSize: 表示消费者所能接收为确认的总体大小上限,单位为B,设置为0表示没有上限。
- global: true表示信道上所有的消费者都要遵从prefetchCount的限定值;false表示信道上新的消费者需要遵循prefetchCount的限定值。
如果需要保证消息的顺序性,可以在消息体内加入序列来保持消息消费的有序性。
QueueingConsumer存在的问题:
- 消费者内存溢出,导致服务宕机,队列堆积。
- 拖累同一个Connection的所有信道,使其性能降低。
- 同步递归调用QueueingConsumer会导致死锁。
- RabbitMQ的自动连接恢复机制不支持QueueingConsumer。
- QueueingConsumer不是事件驱动的。
建议尽量使用继承DefaultConsumer的方式。
18. 消息传输保障
At most once: 最多一次。可能丢失,但不重复。
At least once: 最少一次。不会丢失,可能重复。
Exactly once: 恰好一次。不会丢失,不会重复。此方式RabbitMQ无法保证。
参考资料
- 本文链接: http://blog.programer.group/middleware/2020-02-21-rabbitmq/
- 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!