1. 关于RabbitMQ的Connection

Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享。某些情况下Channel的操作可以并发运行,但是在其他情况下会导致网络上出现错误的通信帧交错,同时也会影响发送方确认(publisher confirm)机制的运行,所以多线程间共享Channel是非线程安全的。

应用程序应该为每一个线程开辟一个Channel。

2. 慎用方法

  • isOpen,这个方法返回值依赖shutdownCause,会引起锁竞争。

3. 交换器声明参数说明

exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> args) throws IOException;

  • exchange: 交换器名称。
  • type: 交换器名称,四种: direct、fanout、topic、headers。
  • durable: 设置是否持久化。durable设置为true表示持久化,反之是非持久化。
  • autoDelete: 设置是否自动删除,设置为true表示自动删除,自动删除的前提是: 至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
  • internal: 设置是否是内置的,设置为true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到这个交换器这种方式。
  • argument: 其他的一些结构化参数。alternate-exchange。

其他重载方法参数说明:

exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> args) throws IOException;

nowait 参数指AMQP中定义队列不需要服务器返回,返回值为void。不建议使用

exechangeDeclarePassive(String name)这个方法主要用来检测交换器是否存在,存在正常返回,不存在抛出异常。

ifUnused: 设置是否在交换器没有被使用情况下删除。否则任何情况下都可以删除。

4. 队列参数说明

queueDeclare(String queue, boolean durable, boolean execlusive, boolean autoDelete, Map<String, Object arguments) throws IOException;

  • queue: 队列名称。
  • durable: 设置是否持久化。为true则设置队列持久化。
  • exclusive: 设置是否排他。如果设置为排他,则仅对首次声明它的连接可见,并在连接断开时(即使设置了持久化)自动删除。排他队列是基于连接(Connection)可见的,同一个连接断开时自动删除。排他是基于连接的,同一个连接的不同信道是可以同时访问同一连接创建的排他队列。
  • autoDelete: 设置是否自动删除,自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  • arguments: 设置队列的其他参数。x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。

消费者在信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,将信道设置为“传输”。

ifEmpty: 设置为true,表示在队列为空的情况下才能删除。

5. 发送消息参数说明

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

  • exchange: 交换器名称,如果设置为空,则消息会发送到默认交换器中。
  • routingKey: 路由健, 交换器根据路由健将消息存储到相应的队列中。
  • props: 消息的基本属性集: 其中包含属性成员,分别有contentType、contentEncoding、headers(Map<String, Object>)、deliverMode、priority、correlationId、replyTo、expriation、messageId、timestamp、type、userId、appId、clusterId。
  • byte[] body: 消息体。
  • mandatory: 设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.return命令将消息返回给生产者。否则消息直接被丢弃。
  • immediate: 设置为true时, 如果交换器在将消息路由到队列都没有消费者时,该消息会通过Basic.Return返回至生产者。

6. 使用mandatory参数时如何获取没有被正确路由到合适队列的消息

可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。

7. 消费消息参数说明

String basicConsume(String queue, boolean actoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

  • queue: 队列的名称。
  • autoAck: 设置是否自动确认。
  • consumerTag: 消费者标签,用来区分多个消费者。
  • noLocal: 设置true表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者。
  • exclusive: 设置是否排他。只允许当前连接消费。
  • argument: 设置消费者的其他参数。
  • callback: 设置消费者的回调函数。

8. 确认消息参数说明

void basicAck(long deliveryTag, boolean multiple) throws IOException;

  • deliveryTag: 当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver方法向消费者推送消息,这个方法携带了一个delivery tag,它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel(待验证)。delivery tag是单调递增的正整数,客户端获取投递的方法用dellivery tag作为一个参数。
  • mutiple: ack unack 的mutiple参数设置为true,则可以一次性应答delivery_tag小于等于传入值的所有应答。

9. 备份交换器

如果不设置mandatory参数,那么在消息在未被路由的情况下将会丢失,设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者代码将变的复杂。

如果不想采取上述措施取确保消息发送成功,可以通过声明备份交换器,声明交换器时使用alternate-exchange参数实现,也可以通过Policy方式实现。

备份交换器如下特殊情况:

  • 如果设置的备份交换器不存在,则不会有异常出现,消息丢失。
  • 如果设置的备份交换器未绑定任何队列,则不会有异常出现,消息丢失。
  • 如果设置的备份交换器未匹配任何队列,则不会有异常出现,消息丢失。
  • 如果备份交换器和mandatory一起使用,则mandatory失效。

10. 消息过期时间

目前有两种方式设置消息的过期时间:

  1. 通过队列属性设置,所有消息都有相同的过期时间。设置队列属性x-message-ttl或者通过Policy设置。也可以通过HTTP API。消息过期会被立即删除。

  2. 通过对消息本身单独设置。在basicPulish方法中添加expiration的属性参数。也可以通过HTTP API。消息过期不会被立即删除,只有当消费者消费消息时判定消息是否过期。

如果两种方式一块使用,则以过期时间较小的为准。

消息一旦到达过期时间,就会变成死信。

如果不设置ttl,则消息不会过期;如果ttl设置为0,此时除非将消息直接投递给消费者,否则该消息会被立即丢弃。

设置消息队列x-expires参数可以控制自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。

11. 死信队列、死信交换器(Dead-Letter-Exchange)

当一个消息在一个队列中变成死信时,它可以被重新发送到另一个交换器(DLX)中,绑定DLX的就被称为私信队列。

12. 延迟队列

延迟队列存储的对象时对应的延迟消息。延迟消息指消息被发送后(生产者?),并不想让消费者立即拿到这个消息,而是等待特定时间后,消费者才能拿这个消息消费。

13. 优先级队列

具有高优先级的队列具有高的被优先消费的特权。设置x-max-priority参数实现。

14. RPC

时序图:

rabbitmq-rpc-seq

流程图:
rabbitmq-rpc-logic

15. 持久化

RabbitMQ持久化分为3个部分:交换器、队列、消息持久化。

交换器和队列通过durable设置持久化;消息持久化通过投递模式(deliveryMode)设置为2实现。

注意:持久化消息正确存入RabbitMQ时,还需要一定时间才能存入磁盘。为了解决这个问题,可以引入RabbitMQ的队列镜像机制,如果主节点挂掉,可以自动切换到从节点,这样就保证了队列的可靠性。

16. 生产者确认

为了保证消息正确存入RabbitMQ,有两种解决方式:

  • 事务方式: 虽然保证了数据的正确性,但是损耗RabbitMQ性能非常大(每次多一次命令帧的交互)。
  • 通过发送方确认实现: 消息被发送到匹配队列后,RabbitMQ就会发送一个Basic.Ack给生产者。如果消息和队列是可持久化的,那么消息被写入磁盘后,才会发出ack确认。

发送方确认有4种方式: 事务和普通confirm,批量和异步confirm。

批量confirm: 发送一批数据后,调用waitForConfirms方法,等待服务器确认返回。缺点是,如果存在一条NACK,则全部需要重新发送,容易造成重复数据。
异步confirm: 提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法处理。

事务机制和发送确认机制是互斥的。

事务机制和发送确认机制确保消息能够正确的发至RabbitMQ,这里的发至指正确的发往至RabbitMQ的交换器,如果此时交换器没有绑定队列,消息丢失。

17. 消费消息

消费消息需要注意:

  • 消息分发: 轮询的方式。
  • 消息的顺序性: 消费者消息的消息和发布消息的顺序是一致的。RabbitMQ的顺序保障是从消息存入队列开始的。
  • 弃用QueueingConsumer

basicQos(int prefetchCount, int PrefetchSize, boolean global)参数说明:

  • prefetchCount: 设置为0表示消息数量没有上限。
  • prefetchSize: 表示消费者所能接收为确认的总体大小上限,单位为B,设置为0表示没有上限。
  • global: true表示信道上所有的消费者都要遵从prefetchCount的限定值;false表示信道上新的消费者需要遵循prefetchCount的限定值。

如果需要保证消息的顺序性,可以在消息体内加入序列来保持消息消费的有序性。

QueueingConsumer存在的问题:

  • 消费者内存溢出,导致服务宕机,队列堆积。
  • 拖累同一个Connection的所有信道,使其性能降低。
  • 同步递归调用QueueingConsumer会导致死锁。
  • RabbitMQ的自动连接恢复机制不支持QueueingConsumer。
  • QueueingConsumer不是事件驱动的。

建议尽量使用继承DefaultConsumer的方式。

18. 消息传输保障

At most once: 最多一次。可能丢失,但不重复。

At least once: 最少一次。不会丢失,可能重复。

Exactly once: 恰好一次。不会丢失,不会重复。此方式RabbitMQ无法保证。

参考资料

RabbitMQ实战指南