1. ConnectionFactory

工厂类的一些属性:

Connection and Channel Listeners用来监听Connection和Channel相关的事件(创建、关闭、终止)。

Channel关闭事件日志策略。

运行时缓存属性配置。在运行时动态的修改连接属性,比如,channel缓存大小、事务channel缓存大小。

RabbitMQ 自动的连接和拓扑恢复。

添加自定义客户端连接属性。
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");

1.1 CachingConnectionFactory

CachingConnectionFactory有两种缓存模式:Connection和Channel。

CachingConnectionFactory是一个Connection工厂实现类(当缓存模式是CachingConnectionFactory.CacheMode.CHANNEL(默认)时,返回所有createConnection()调用都返回相同的Connection,忽略对Connection.close()的调用并缓存Channel)。
默认情况下,只缓存25个Channel,并根据需要创建和处理其他请求的Channel。在高并发情况下考虑提高channelCacheSize的值。channelCacheSize不限制channel的创建数量,如果需要限制,可以使用channelCheckoutTimeout,当此 channelCheckoutTimeout 大于零时,channelCacheSize将成为可以在连接上创建的 channel 数量的限制。如果达到限制,则调用线程将阻塞,直到 channel 可用或达到此超时,在这种情况下抛出AmqpTimeoutException。

当缓存模式是CachingConnectionFactory.CacheMode.CONNECTION时,每个createConnection()都会使用一个新的(缓存的)Connection;连接将根据connectionCacheSize的值缓存。在这种模式下,Connections和Channels都会被缓存。connectionLimit可以用来限制创建Connection的数量,当数量到达时可以使用channelCheckoutTimeLimit等待缓存中的连接空闲,如果超时,将抛出AmqpTimeoutException

当使用比较大数量的Connection时,应该考虑在CachingConnectionFactory设置自定义的executorexecutor的线程池应不受限制,或应针对预期用途进行适当设置(通常一个线程一个连接)。如果在每个Connection创建多个Channel,线程池的大小影响并发。因此,可变(或简单缓存)线程池执行程序将是最合适的。

CachingConnectionFactory.CacheMode.CONNECTION模式和Rabbit Admin自动声明的队列不兼容。

CachingConnectionFactory要求显式关闭从其Connection获得的所有Channel。通常,代码都需要显式的关闭所有的Channel。但是,对于此ConnectionFactory,必须使用显式关闭才能真正使Channel重用。如果有空间(channelCacheSize),Channel.close()会将通道返回到缓存,否则物理上关闭该通道。

1.2 SimpleRoutingConnectionFactory

对多个ConnectionFactory进行管理。

  1. SimpleResourceHolder绑定lookupKey
  2. RabbitTemplate表达式支持

2. AmqpTemplate

2.1 添加重试机制

第一种方式是在rabbitTemplate中注入RetryTemplate,这种方式如果使用RecoveryCallback则只能在重试上下文中使用lastThrowable。

1
2
3
4
5
6
7
8
9
10
11
12
13

@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}

第二种方式是直接使用retryTemplate进行重试,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
retryTemplate.execute(
new RetryCallback<Object, Exception>() {

@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}, new RecoveryCallback<Object>() {

@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}

这种方式你可以在上下文中获取属性信息。

2.2 怎样确认异步消息的成功和失败

默认情况下,rabbitmq将会删除不能被路由的消息,对于成功的消息,会返回一个异步的确认信息。
考虑如下两种情况:

  • 发送消息没有匹配的队列
  • 发送消息没有匹配exchange

第一种情况可以使用消息返回处理;第二种情况消息被删除,不会返回消息,channel关闭并抛出异常。这种情况你可以注册ChannelListener。

1
2
3
4
5
6
7
8
9
10
11
12
13
this.connectionFactory.addConnectionListener(new ConnectionListener() {

@Override
public void onCreate(Connection connection) {
}

@Override
// signal中包含reson信息
public void onShutDown(ShutdownSignalException signal) {
...
}

});

2.3 发送消息确认与返回

2.3.1 消息返回

消息返回必须设置mandatory属性,或者发送消息时针对特定消息设置mandatory-expression属性。

消息返回可以通过rabbitTemplate设置返回回调。

rabbitTemplate.setReturnCallback(ReturnCallback callback)

注意:一个rabbitTemplate实例仅支持一个ReturnCallback。

2.3.2 消息确认

消息确认需要设置publisherConfirms属性。

消息确认同样可以设置ConfirmCallback,用来处理消息返回确认。

值得特别注意的是使用消息确认时,需要等到最后一个消息确认完成时再关闭channel。

因为当connectfactory的channelCacheSize满了时,框架会在保持5秒channel后关闭channel,有可能导致没有足够的时间确认和消息返回。
当仅使用返回时,channel保持5秒开启。

只要返回回调在60秒或更短时间内执行,就可以保证在确认之前仍接收到返回的消息。确认在返回回调退出后或60秒后(以先到者为准)进行传递。

自从2.1版本开始,开始使用CorrelationData,而不是ConfirmCallback。

1
2
3
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

另外,当同时启用确认和返回,CorrelationData由返回消息组成。它保证在使用将来设置ack之前发生这种情况。

2.4 一种更简单的等待发布者确认的机制

有时可能希望更好地控制通道的使用,并确保在同一通道上执行全部操作。

从2.0版本,提供了invoke方法,并带有一个OperationCallback。任何操作在callback的范围内及在提供的RabbitOpertions参数中执行都使用相同的Channel,这个channel在结束时关闭(不返回至缓存)。如果ChannelPublisherCallbackChannel,它在所有确认收到后返回至缓存。

1
2
3
4
5
6
@FunctionalInterface
public interface OperationsCallback<T> {

T doInRabbit(RabbitOperations operations);

}

您可能需要这样做的一个示例是,如果您希望在基础Channel上使用waitForConfirms()方法。如前所述,Spring API以前并未公开此方法,因为通常是对通道进行缓存和共享。 RabbitTemplate现在提供了waitForConfirms(long timeout)和waitForConfirmsOrDie(long timeout),它们委托给OperationsCallback范围内使用特定channel。由于明显的原因,不能在该范围之外使用这些方法。

如果您只想等待broker确认交付,则可以使用以下示例中显示的技术:

1
2
3
4
5
6
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});

如果您希望在OperationsCallback范围内的同一通道上调用RabbitAdmin操作,则必须使用与调用操作相同的RabbitTemplate构造admin。

如果在现有事务的范围内已经执行了template操作,例如,当在事务处理的listener container线程上运行并在事务处理的template上执行操作时,前面的讨论就没有什么意义了。在这种情况下,操作将在该通道上执行,并在线程返回到容器时提交。在这种情况下,不必使用invoke。

2.5 Publisher使用单独的连接

从2.0.2版本开始,将usePublisherConnection属性设置为true,以便可能的情况下使用与listener容器使用的连接不同的连接。

如果声明的是独占队列,会因为使用不同的连接,导致其中某一个连接获取不到队列
通常,不应将RabbitAdmin与具有此设置为 true 的模板一起使用。使用带有连接工厂的RabbitAdmin构造函数;如果使用其他构造函数来获取模板,请确保模板的 property 为 false。这是因为通常使用 admin 来为 listener 容器声明队列;使用 property 设置为 true 的模板意味着将在与 listener 容器使用的连接不同的连接上声明独占队列(例如AnonymousQueue)。在这种情况下,容器不能使用队列。

3. 发送消息

3.1 Message创建API

1
2
3
4
5
6
7
8
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
1
2
3
4
5
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();

3.2 批处理

BatchingRabbitTemplate

批量数据保存在 memory 中;未发送的消息可能会在系统故障的 event 中丢失。

BatchingRabbitTemplate根据SimpleBatchingStrategy发送消息。它支持将消息发送到单个 exchange/routing key。它有 properties:

  • batchSize - 发送前批处理中的消息数

  • bufferLimit - 批量消息的最大大小;如果超过,将抢占batchSize,并导致发送部分批次

  • timeout - 一个 time,当没有新的活动向批处理添加消息时,将发送部分批处理

SimpleBatchingStrategy通过在每个嵌入消息之前使用4字节二进制长度来格式化批处理。通过将springBatchFormat message property 设置为lengthHeader4,将其传达给接收系统。

3.3 消费消息

3.3.1 轮询消费者

默认情况下,如果没有可用消息,则立即返回null;没有阻挡。从 version 1.5 开始,您现在可以设置receiveTimeout(以毫秒为单位),接收方法将阻塞,知道最大过期时间。value 小于零意味着无限期地阻塞(或者至少在与 broker 的连接丢失之前)。 Version 1.6 引入了receive方法的变体,允许在每次调用时传递超时时间。

这种轮询操作会为每一个消息创建一个QueueingConsumer。所以不适用于大数据量的环境。

3.3.2 异步消费者

3.3.2.1 MessageListener

异步消息接收,涉及专用的回调容器。回调则是我们与消息集成的地方。MessageListener就是这种回调。

1
2
3
public interface MessageListener {
void onMessage(Message message);
}

如果您的回调逻辑出于任何原因依赖于 AMQP Channel 实例,您可以改为使用ChannelAwareMessageListener。

1
2
3
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}

3.3.2.2 MessageListenerAdapter

如果您希望在 application 逻辑和消息传递 API 之间保持更严格的分离,则可以依赖 framework 提供的适配器 implementation。这通常被称为“Message-driven POJO”支持。

从 version 2.0 开始,提供了方便的FunctionalInterface:

1
2
3
4
5
6
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

R handleMessage(T t);

}

这有助于使用 Java 8 lamdas 方便地配置适配器:

1
2
3
4
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));

3.3.2.3 消费者优先权

从RabbitMQ Version 3.2开始,broker 现在支持 consumer 优先级(参见在 RabbitMQ 中使用 Consumer 优先级)。通过在 consumer 上设置x-priority参数来启用此功能。 SimpleMessageListenerContainer现在支持设置 consumer arguments:

1
2
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

3.3.2.4 容器监听自动删除队列

当容器配置为侦听auto-delete queue(s),或者队列具有x-expires选项或 Broker 上配置了Time-To-Live policy 时,broker 在容器停止时删除队列(最后 consumer 被取消)。在 version 1.3 之前,由于队列丢失,无法重新启动容器;当连接是 closed/opens 时,RabbitAdmin仅自动重新声明队列等,当容器为 stopped/started 时不会发生。

从 version 1.3 开始,容器现在将使用RabbitAdmin在启动期间重新声明任何丢失的队列。

您还可以使用条件声明和auto-startup=”false”一起来管理延迟声明队列,直到容器启动。

在这种情况下,队列和交换由containerAdmin声明,它具有auto-startup=”false”,因此在 context 初始化期间不会声明元素。此外,出于同样的原因,容器未启动。稍后启动容器时,它会使用 reference containerAdmin来声明元素。

3.3.2.5 批处理消息

批处理的消息(由生产者创建)由侦听器容器自动分批处理(使用springBatchFormat消息头)。拒绝批次中的任何消息都会导致整个批次被拒绝。有关批处理的更多信息,请参见批处理。

设置容器属性ConsumerBatchEnabled以启用此功能。 deBatchingEnabled也必须为true,以便容器负责处理这两种类型的批次。当ConsumerBatchEnabled为true时,实现BatchMessageListener或ChannelAwareBatchMessageListener。有关将此功能与@RabbitListener一起使用的信息,请参阅带有批处理的@RabbitListener。

3.3.2.6 消费者事件

每当侦听器(消费者)遇到某种类型的故障时,容器就会发布应用程序事件。事件ListenerContainerConsumerFailedEvent具有以下属性:

  • container: 消费者遇到问题的侦听器容器。
  • reason: 失败的文字原因。
  • fatal: 指示失败是否致命的布尔值。对于非致命异常,容器将根据recoveryInterval或recoveryBackoff(对于SimpleMessageListenerContainer)或monitorInterval(对于DirectMessageListenerContainer)尝试重新启动消费者。
  • throwable: 被捕获的Throwable。

如果消费者由于默认情况下不专门使用其队列以及发布事件而失败,那么将发出WARN日志。要更改此日志记录行为,请在SimpleMessageListenerContainer实例的ExclusiveConsumerExceptionLogger属性中提供自定义的ConditionalExceptionLogger。另请参阅记录通道关闭事件。

致命错误始终记录在ERROR级别。这是不可修改的。

在容器生命周期的各个阶段还发布了其他一些事件:

  • AsyncConsumerStartedEvent:启动消费者时。

  • AsyncConsumerRestartedEvent:消费者在发生故障后重新启动时-仅SimpleMessageListenerContainer。

  • AsyncConsumerTerminatedEvent:消费者正常停止时。

  • AsyncConsumerStoppedEvent:消费者停止时-仅SimpleMessageListenerContainer。

  • ConsumeOkEvent:当从Broker接收到一个consumerOk时,包含队列名称和consumerTag

  • ListenerContainerIdleEvent:请参阅检测空闲的异步消费者。

3.3.2.7 消费者标签

默认情况下,消费者标签由broker提供。你可以使用ConsumerTagStrategy实现自己的命名策略。

3.3.2.8 注解驱动的侦听器端点

异步接收消息的最简单方法是使用带注释的侦听器端点基础结构。简而言之,它使您可以将托管bean的方法公开为Rabbit侦听器端点。以下示例显示了如何使用@RabbitListener批注:

1
2
3
4
5
6
7
8
9
@Component
public class MyService {

@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}

}

上面这种方式,绑定了已经存在的队列。除了上述的方式,你还可以使用自动声明队列的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

@Component
public class MyService {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}

@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}

}

在第一个示例中,队列myQueue与交换(如果需要)一起自动(持久)声明,并与路由键绑定到交换。在第二个示例中,声明并绑定了一个匿名(专有的,自动删除)队列。可以提供多个QueueBinding条目,让侦听器侦听多个队列。在第三个示例中,如有必要,将声明一个具有从属性my.queue检索到的名称的队列,并使用该队列名称作为路由键将其默认绑定到默认交换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "thing1", value = "somevalue"),
@Argument(name = "thing2")
})
)
public String handleWithHeadersExchange(String foo) {
...
}

@RabbitListener属性参数:

  • String id
  • String containerFactory
  • String[] queues
  • Queue[] queuesToDeclare
  • boolean exclusive
  • String priority
  • String admin
  • QueueBinding[] bindings
    • Queue value
    • Exchange exchange
    • String[] key // routingKey
    • String ignoreDeclarationExceptions
    • Argument[] arguments
    • String declare // 如果admin存在,需要声明此属性
    • String[] admins
  • String group // 如果存在Listener容器,则可以将容器的beanName赋予此属性
  • String returnExceptions // 设置为true时,设置为true会导致Listener抛出的异常使用正常的replyto/SendTo语义发送给发送者,设置false时,将异常抛出到Listener容器并执行正常的重试/死信处理。
  • String errorHandler //如果Listener容器出现异常,则处理异常
  • String concurrency
  • String autoStartup // 延迟创建队列,交换器

3.3.2.9 创建自己的监听注解(元注解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}

public class MetaListener {

@MyAnonFanoutListener
public void handle1(String foo) {
...
}

@MyAnonFanoutListener
public void handle2(String foo) {
...
}

}

2.2.3 版本开始,支持@AliasFor以允许在元注释的注释上覆盖属性。而且,用户注释现在可以是@Repeatable的,从而允许为一个方法创建多个容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
static class MetaAnnotationTestBean {

@MyListener("queue1")
@MyListener("queue2")
public void handleIt(String body) {
}

}


@RabbitListener
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(MyListeners.class)
static @interface MyListener {

@AliasFor(annotation = RabbitListener.class, attribute = "queues")
String[] value() default {};

}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
static @interface MyListeners {

MyListener[] value();

}

3.3.2.10 启用侦听器端点注释

要启用对@RabbitListener批注的支持,可以将@EnableRabbit添加到您的@Configuration类之一。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableRabbit
public class AppConfig {

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}

从2.0版开始,DirectMessageListenerContainerFactory也可用。它创建DirectMessageListenerContainer实例。

从2.2.2版开始,您可以提供ContainerCustomizer实现(如上所示)。创建和配置容器后,可以使用该方法进一步配置容器。例如,您可以使用它来设置容器工厂未公开的属性。

默认情况下,底层将查找名为RabbitListenerContainerFactory的bean作为工厂的源,以用于创建消息侦听器容器。在这种情况下,忽略了RabbitMQ底层设置,可以使用三个线程的核心池大小和十个线程的最大池大小来调用processOrder方法。

您可以定制用于每个注释的侦听器容器工厂,也可以通过实现RabbitListenerConfigurer接口来配置显式默认值。仅当至少一个端点在没有特定容器工厂的情况下注册时,才需要使用默认值。有关完整的详细信息和示例,请参见Javadoc。

容器工厂提供了添加MessagePostProcessor实例的方法,这些实例在接收消息之后(调用侦听器之前)和发送回复之前应用。

从版本2.0.6开始,可以将RetryTemplate和RecoveryCallback添加到侦听器容器工厂。发送回复时使用。重试用尽时将调用RecoveryCallback。您可以使用SendRetryContextAccessor从上下文中获取信息。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});

从2.0版开始,@RabbitListener批注具有并发属性。它支持SpEL表达式(#{…})和属性占位符($ {…})。其含义和允许的值取决于容器类型,如下所示:

  • 对于DirectMessageListenerContainer,该值必须是单个整数值,该值设置容器上的consumersPerQueue属性。

  • 对于SimpleRabbitListenerContainer,该值可以是一个单个整数值,该值在容器上设置了concurrentConsumers属性,也可以具有m-n的形式,其中m是parallelConsumers属性,n是maxConcurrentConsumers属性。

无论哪种情况,此设置都将覆盖出厂设置。以前,如果您有需要不同并发性的侦听器,则必须定义不同的容器工厂。

该注释还允许通过autoStartup和executor(自2.2起)注释属性覆盖工厂的autoStartup和taskExecutor属性。为每个执行程序使用不同的执行程序可能有助于在日志和线程转储中标识与每个侦听器关联的线程。

1
2
3
4
5
6
7
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

...
channel.basicAck(tag, false);
}

3.3.2.11 带注释方法的消息转换

调用侦听器之前,管道中有两个转换步骤。第一步使用MessageConverter将传入的Spring AMQP消息转换为Spring消息。调用目标方法时,如有必要,消息将转换为方法参数类型。

第一步的默认MessageConverter是Spring AMQP SimpleMessageConverter,它处理到String和java.io.Serializable对象的转换。所有其他的都保留为byte []。在下面的讨论中,我们称其为“消息转换器”。

第二步的默认转换器是GenericMessageConverter,它委派给转换服务(DefaultFormattingConversionService的实例)。在下面的讨论中,我们将其称为“方法参数转换器”。

1
2
3
4
5
6
7
8
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}

在1.6之前的版本中,必须在消息头中提供用于转换JSON的类型信息,或者需要自定义ClassMapper。从版本1.6开始,如果没有类型信息标头,则可以从目标方法参数中推断类型。

自定义消息转换器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

...

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
return factory;
}

@Bean
public ConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

...

}

3.3.2.12 程序化端点注册

RabbitListenerEndpoint提供了Rabbit端点的模型,并负责为该模型配置容器。除了RabbitListener批注检测到的端点之外,基础结构还允许您以编程方式配置端点。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}

在前面的示例中,我们使用了SimpleRabbitListenerEndpoint,它提供了实际的MessageListener来调用,但是您也可以构建自己的端点变体来描述自定义调用机制。

应该注意的是,您也可以完全跳过@RabbitListener的使用,并通过RabbitListenerConfigurer以编程方式注册端点。

3.3.2.13 带注释的端点方法签名

到目前为止,我们已经在端点中注入了一个简单的String,但实际上它可以具有非常灵活的方法签名。以下示例将其重写为使用自定义标头注入Order:

1
2
3
4
5
6
7
8
@Component
public class MyService {

@RabbitListener(queues = "myQueue")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}

以下列表显示了可以在侦听器端点中注入的主要元素:

  • 原始的org.springframework.amqp.core.Message。

  • 接收消息的com.rabbitmq.client.Channel。

  • 代表传入的AMQP消息的org.springframework.messaging.Message。请注意,此消息同时包含自定义标头和标准标头(由AmqpHeaders定义)。

例如,如果我们想在处理订单之前确保其有效,则可以使用@Valid注释有效负载并配置必要的验证器,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}

3.3.2.14 监听多个队列

1
2
3
4
5
6
7
8
9
10

@Component
public class MyService {

@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}

}
1
2
3
4
5
6
7
8
9
@Component
public class MyService {

@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}

}

3.3.2.15 回复管理

MessageListenerAdapter中的现有支持已经使您的方法具有非空返回类型。在这种情况下,调用的结果将封装在一条消息中,该消息发送到原始消息的ReplyToAddress标头中指定的地址或侦听器上配置的默认地址。您可以使用消息传递抽象的@SendTo批注设置该默认地址。

假设我们的processOrder方法现在应该返回一个OrderStatus,我们可以如下编写它以自动发送回复:

1
2
3
4
5
6
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}

如果需要以与传输无关的方式设置其他标头,则可以返回一条Message,如下所示:

1
2
3
4
5
6
7
8
9
10

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}

或者,可以在beforeSendReplyMessagePostProcessors容器工厂属性中使用MessagePostProcessor添加更多标头。从版本2.2.3开始,在答复消息中可以使用被调用的bean/方法,可以在消息后处理器中使用它来将信息传达回调用方:

1
2
3
4
5
6
7
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});

从版本2.2.5开始,您可以配置ReplyPostProcessor在发送回复消息之前对其进行修改;在为相关请求设置了relatedId标头之后,将调用此方法。

1
2
3
4
5
6
7
8
9
10
11
12
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}

@SendTo值被假定为跟随exchange / routingKey模式的回复交换和routingKey对,其中这些部分之一可以省略。有效值如下:

something1 / thing2:replyTo交换和routingKey。 something1 /:replyTo交换和默认(空)routingKey。 something2或/ thing2:replyTo routingKey和默认(空)交换。 /或为空:replyTo默认交换和默认routingKey。

此外,您可以使用不带值属性的@SendTo。这种情况下等于一个空的sendTo模式。仅当入站邮件没有ReplyToAddress属性时才使用@SendTo。

从版本1.5开始,@SendTo值可以是bean初始化SpEL表达式,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}

对于动态回复路由,消息发送者应包括reply_to消息属性或使用备用运行时SpEL表达式(在下一个示例之后进行描述)。

从版本1.6开始,@SendTo可以是SpEL表达式,在运行时将根据请求和答复对它进行评估,如以下示例所示:

1
2
3
4
5
6

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}

SpEL表达式的运行时性质由!{…}分隔符指示。表达式的评估上下文#root对象具有三个属性:

  • request:o.s.amqp.core.Message请求对象。

  • 来源:转换后的o.s.messaging.Message <?>。

  • 结果:方法结果。

上下文具有一个map属性访问器,一个标准类型转换器和一个bean解析器,该解析器允许引用上下文中的其他bean(例如@ someBeanName.determineReplyQ(request,result))。

总而言之,#{…}在初始化期间被初始化一次,其中#root对象是应用程序上下文。 Bean由其名称引用。在运行时为每条消息评估!{…},根对象具有前面列出的属性。 Bean的名称以@开头。

从2.1版开始,还支持简单的属性占位符(例如$ {some.reply.to})。对于早期版本,可以将以下内容用作替代方法,如以下示例所示:

1
2
3
4
5
6
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}

3.3.2.16 多方法侦听器

从1.5.0版开始,您可以在类级别指定@RabbitListener注释。与新的@RabbitHandler批注一起,这使单个侦听器可以根据传入消息的有效负载类型调用不同的方法。最好用一个例子来描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RabbitListener(id="multi", queues = "someQueue")
@SendTo("my.reply.queue")
public class MultiListenerBean {

@RabbitHandler
public String thing2(Thing2 thing2) {
...
}

@RabbitHandler
public String cat(Cat cat) {
...
}

@RabbitHandler
public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
...
}

@RabbitHandler(isDefault = true)
public String defaultMethod(Object object) {
...
}

}

@Repeatable @RabbitListener
从版本1.6开始,@RabbitListener批注标记为@Repeatable。这意味着注释可以多次出现在相同的注释元素(方法或类)上。在这种情况下,将为每个注释创建一个单独的侦听器容器,每个注释容器都调用相同的侦听器@Bean。可重复的注释可以与Java 8或更高版本一起使用。使用Java 7或更早版本时,可以通过使用@RabbitListeners“容器”注释和@RabbitListener注释数组来达到相同的效果。

3.3.2.17 代理@RabbitListener和泛型

如果要代理您的服务(例如,对于@Transactional),则在接口具有通用参数时,应牢记一些注意事项。考虑以下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

interface TxService<P> {

String handle(P payload, String header);

}

static class TxServiceImpl implements TxService<Foo> {

@Override
@RabbitListener(...)
public String handle(Foo foo, String rk) {
...
}

}

使用通用接口和特定实现,您将不得不切换到CGLIB目标类代理,因为接口句柄方法的实际实现是桥接方法。在事务管理的情况下,通过使用注释选项来配置CGLIB的使用:@EnableTransactionManagement(proxyTargetClass = true)。在这种情况下,必须在实现中的目标方法上声明所有注释,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11

static class TxServiceImpl implements TxService<Foo> {

@Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}

}

3.3.2.18 处理异常

默认情况下,如果带注释的侦听器方法引发异常,则将其引发到容器中,并根据容器和代理的配置,将消息重新排队并重新传递,丢弃或路由到死信交换。什么也没有退还给发送者。

从2.0版开始,@RabbitListener批注具有两个新属性:errorHandler和returnExceptions。

这些默认情况下未配置。

您可以使用errorHandler来提供RabbitListenerErrorHandler实现的bean名称。此功能接口有一种方法,如下所示:

1
2
3
4
5
6
7
@FunctionalInterface
public interface RabbitListenerErrorHandler {

Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception;

}

如您所见,您可以访问从容器接收的原始消息,由消息转换器生成的spring-messaging Message <?>对象以及由侦听器引发的异常(包装在ListenerExecutionFailedException中)。错误处理程序可以返回某些结果(作为答复发送),也可以引发原始异常或新异常(根据returnExceptions设置,将其抛出到容器中或返回给发送者)。

returnExceptions属性为true时,会将异常返回给发送方。异常包装在RemoteInvocationResult对象中。在发送方,有一个可用的RemoteInvocationAwareMessageConverterAdapter,如果将其配置到RabbitTemplate中,则会重新引发服务器端异常,并包装在AmqpRemoteException中。服务器异常的堆栈跟踪是通过合并服务器和客户端堆栈跟踪来综合的。

该机制通常仅适用于使用Java序列化的默认SimpleMessageConverter。异常通常不是“杰克逊友好的”,并且不能序列化为JSON。如果使用JSON,请考虑在引发异常时使用errorHandler返回其他一些Jackson友好的Error对象。

在版本2.1中,此接口从软件包o.s.amqp.rabbit.listener移至o.s.amqp.rabbit.listener.api。

从版本2.1.7开始,通道在消息头中可用。使用AcknowledgeMode.MANUAL时,这允许您确认或拒绝失败的消息:

1
2
3
4
5
6
7
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) {
...
message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class),
true);
}

3.3.2.19 容器管理

为注释创建的容器未在应用程序上下文中注册。您可以通过调用RabbitListenerEndpointRegistry bean上的getListenerContainers()获得所有容器的集合。然后,您可以遍历此集合,例如,停止或启动所有容器,或在注册表本身上调用Lifecycle方法,这将在每个容器上调用操作。

您还可以通过使用单个容器的ID来获取对单个容器的引用,例如,使用上面的代码段创建的容器,使用getListenerContainer(String id)-例如,registry.getListenerContainer(“ multi”)。

从1.5.2版开始,您可以使用getListenerContainerIds()获得已注册容器的ID值。

从1.5版开始,您现在可以将组分配给RabbitListener端点上的容器。这提供了一种获取对容器子集的引用的机制。添加组属性会导致将Collection 类型的Bean注册到具有组名的上下文中。

3.3.2.20 批量接收消息 @RabbitmqListener

当接收到一批消息时,通常由容器执行分批处理,并且一次仅用一条消息来调用侦听器。从2.2版开始,您可以将侦听器容器工厂和侦听器配置为在一个调用中接收整个批次,只需设置工厂的batchListener属性,并将方法有效负载参数设为List:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}

将batchListener属性设置为true会自动关闭工厂创建的容器中的deBatchingEnabled容器属性(除非ConsumerBatchEnabled为true,请参见下文)。实际上,分批处理已从容器移动到侦听器适配器,并且适配器创建了传递到侦听器的列表。

启用批处理的工厂不能与多方法侦听器一起使用。

同样从2.2版开始。一次接收批处理的消息时,最后一条消息包含设置为true的布尔标头。可以通过将@Header(AmqpHeaders.LAST_IN_BATCH)布尔值last`参数添加到您的侦听器方法中来获取此标头。头是从MessageProperties.isLastInBatch()映射的。此外,AmqpHeaders.BATCH_SIZE填充有每个消息片段中的批处理大小。

此外,新属性consumerBatchEnabled已添加到SimpleMessageListenerContainer。如果为真,则容器将创建一批消息,最大为batchSize;如果receiveTimeout过去了,但没有新消息到达,则将分批交付。如果收到生产者创建的批次,则将其分批并添加到消费者方批次;因此,实际传递的消息数可能超过batchSize,该大小代表从代理接收到的消息数。当ConsumerBatchEnabled为true时,deBatchingEnabled必须为true;否则为false。集装箱工厂将执行此要求。

1
2
3
4
5
6
7
8
9
10
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}

当将consumerBatchEnabled与@RabbitListener一起使用时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
this.amqpMessagesReceived = amqpMessages;
this.batch1Latch.countDown();
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
this.messagingMessagesReceived = messages;
this.batch2Latch.countDown();
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
this.batch3Strings = strings;
this.batch3Latch.countDown();
}

  • 第一个是使用未转换的原始org.springframework.amqp.core.Message调用的。

  • 第二个是通过org.springframework.messaging.Message <?>调用的,其中包含转换后的有效负载和映射的标头/属性。

  • 第三个调用是使用转换后的有效负载进行的,而无法访问标头/属性。

您还可以添加一个通道参数,该参数通常在使用手动确认模式时使用。这对于第三个示例不是很有用,因为您无权访问delivery_tag属性。

3.3.2.21 使用容器工厂

引入了侦听器容器工厂以支持@RabbitListener并向RabbitListenerEndpointRegistry注册容器,如“编程端点注册”中所述。

从2.1版开始,它们可用于创建任何侦听器容器-甚至是没有侦听器的容器(例如在Spring Integration中使用的容器)。当然,必须在启动容器之前添加侦听器。

有两种创建此类容器的方法:

  • 使用SimpleRabbitListenerEndpoint

  • 创建后添加侦听器

1
2
3
4
5
6
7
8
9
10
@Bean
public SimpleMessageListenerContainer factoryCreatedContainerSimpleListener(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("queue.1");
endpoint.setMessageListener(message -> {
...
});
return rabbitListenerContainerFactory.createListenerContainer(endpoint);
}
1
2
3
4
5
6
7
8
9
10
@Bean
public SimpleMessageListenerContainer factoryCreatedContainerNoListener(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {
SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setMessageListener(message -> {
...
});
container.setQueueNames("test.no.listener.yet");
return container;
}

无论哪种情况,侦听器也可以是ChannelAwareMessageListener,因为它现在是MessageListener的子接口。

如果您希望创建多个具有相似属性的容器或使用预先配置的容器工厂(例如,Spring Boot自动配置提供的容器或同时使用两者),则这些技术很有用。

用这种方法创建的容器是普通的@Bean实例,并且未在RabbitListenerEndpointRegistry中注册。

3.3.2.22 异步@RabbitListener返回类型

从2.1版开始,可以使用异步返回类型ListenableFuture <?>和Mono <?>指定@RabbitListener(和@RabbitHandler)方法,从而使答复以异步方式发送。

必须使用AcknowledgeMode.MANUAL配置侦听器容器工厂,以便消费者线程不会确认该消息。相反,异步完成将在异步操作完成时确认或拒绝消息。当异步结果有错误完成时,是否重新排队消息取决于抛出的异常类型,容器配置和容器错误处理程序。默认情况下,除非容器的defaultRequeueRejected属性设置为false(默认情况下为true),否则将重新排队该消息。如果异步结果是通过AmqpRejectAndDontRequeueException完成的,则不会重新排队该消息。如果容器的defaultRequeueRejected属性为false,则可以通过将Future的异常设置为InstantRequeueException来覆盖该值,然后将消息重新排队。如果在侦听器方法中发生了一些阻止创建异步结果对象的异常,则您必须捕获该异常并返回适当的返回对象,该对象将导致消息被确认或重新排队。

3.3.2.23 线程和异步消费者

当RabbitMQ Client传递新消息时,使用SimpleMessageListenerContainer中配置的TaskExecutor的线程来调用MessageListener。如果未配置,则使用SimpleAsyncTaskExecutor。如果使用池执行器,则需要确保池大小足以处理配置的并发。使用DirectMessageListenerContainer,可以直接在RabbitMQ客户端线程上调用MessageListener。在这种情况下,taskExecutor用于监视消费者的任务。

使用默认的SimpleAsyncTaskExecutor时,对于在其上调用侦听器的线程,在threadNamePrefix中使用侦听器容器beanName。这对于日志分析很有用。我们通常建议始终在日志记录附加程序配置中包括线程名称。通过容器上的taskExecutor属性专门提供TaskExecutor时,将按原样使用它,而无需进行修改。建议您使用类似的技术来命名由自定义TaskExecutor bean定义创建的线程,以帮助在日志消息中标识线程。

创建连接时,在CachingConnectionFactory中配置的Executor将传递到RabbitMQ Client中,并且其线程用于将新消息传递到侦听器容器。如果未配置,则客户端使用内部线程池执行程序,该执行程序的池大小为5。

使用DirectMessageListenerContainer,您需要确保连接工厂配置有任务执行程序,该任务执行程序具有足够的线程来支持使用该工厂的所有侦听器容器之间的所需并发。默认池大小只有五个。
RabbitMQ客户端使用ThreadFactory创建用于低级I / O(套接字)操作的线程。要修改此工厂,您需要配置基础RabbitMQ ConnectionFactory,如配置基础客户端连接工厂中所述。

3.3.2.24 选择一个容器

2.0版引入了DirectMessageListenerContainer(DMLC)。以前,只有SimpleMessageListenerContainer(SMLC)可用。 SMLC为每个消费者使用内部队列和专用线程。如果将一个容器配置为侦听多个队列,则使用相同的消费者线程来处理所有队列。并发是由并发消费和其他属性控制的。当消息从RabbitMQ客户端到达时,客户端线程将它们通过队列传递给使用方线程。之所以需要这种架构,是因为在RabbitMQ客户端的早期版本中,无法同时进行多个交付。较新版本的客户端具有修订的线程模型,现在可以支持并发。这允许引入DMLC,现在可以在RabbitMQ客户端线程上直接调用侦听器。因此,它的体系结构实际上比SMLC“简单”。但是,这种方法存在一些局限性,并且DMLC无法使用SMLC的某些功能。另外,并发由consumersPerQueue(和客户端库的线程池)控制。 parallelConsumers和关联的属性不适用于此容器。

以下功能可用于SMLC,但不可用于DMLC:

  • batchSize:使用SMLC,您可以将其设置为控制在一个事务中传递多少消息或减少确认的数量,但是在失败后可能导致重复传递的数量增加。 (DMLC确实具有messagesPerAck,可用于减少确认,与batchSize和SMLC相同,但不能与事务一起使用-—每条消息都在单独的事务中传递和确认)。

  • ConsumerBatchEnabled:在消费者中批量处理离散消息;有关更多信息,请参见消息侦听器容器配置。

  • maxConcurrentConsumers和消费者缩放间隔或触发器-DMLC中没有自动缩放。但是,它确实允许您以编程方式更改consumersPerQueue属性,并相应地调整消费者。

但是,DMLC比SMLC具有以下优点:

  • 在运行时添加和删除队列更加有效。使用SMLC,将重新启动整个消费者线程(取消并重新创建所有消费者)。使用DMLC,不会取消未受影响的消费者。

  • 避免在RabbitMQ客户端线程和消费者线程之间进行上下文切换。

  • 线程在消费者之间共享,而不是在SMLC中为每个消费者使用专用线程。但是,请参阅“线程和异步消费者”中有关连接工厂配置的重要说明。

有关哪些配置属性适用于每个容器的信息,请参见消息侦听器容器配置。

3.3.2.25 检测空闲的异步消费者

尽管效率很高,但异步消费者的一个问题是检测它们何时处于空闲状态—如果一段时间内没有消息到达,用户可能希望采取某些措施。

从1.6版开始,现在可以将侦听器容器配置为在一段时间没有消息传递的情况下发布ListenerContainerIdleEvent。当容器处于空闲状态时,每隔idleEventInterval毫秒发布一次事件。

要配置此功能,请在容器上设置idleEventInterval。以下示例显示了如何使用XML和Java(对于SimpleMessageListenerContainer和SimpleRabbitListenerContainerFactory)执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
return factory;
}

3.3.2.26 消费事件

您可以通过实现ApplicationListener来捕获空闲事件-可以是常规侦听器,也可以是仅侦听仅接收此特定事件的侦听器。您还可以使用Spring Framework 4.2中引入的@EventListener。

以下示例将@RabbitListener和@EventListener组合到一个类中。您需要了解应用程序侦听器会获取所有容器的事件,因此,如果您要基于哪个容器处于空闲状态采取特定的操作,则可能需要检查侦听器ID。您也可以为此使用@EventListener条件。

事件具有四个属性:

  • 来源:侦听器容器实例

  • id:侦听器ID(或容器bean名称)

  • idleTime:事件发布时容器空闲的时间

  • queueNames:容器侦听的队列的名称

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Listener {

@RabbitListener(id="someId", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}

@EventListener(condition = "event.listenerId == 'someId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}

}

事件侦听器查看所有容器的事件。因此,在前面的示例中,我们根据侦听器ID缩小了接收到的事件的范围。

如果希望使用idle事件停止列表器容器,则不应在调用侦听器的线程上调用container.stop()。这样做总是会导致延迟和不必要的日志消息。相反,您应该将事件移交给另一个线程,然后该线程可以停止容器。

3.3.2.27 监视侦听器性能

从2.2版开始,如果在类路径上检测到Micrometer且应用程序上下文中存在MeterRegistry,则侦听器容器将自动为侦听器创建和更新Micrometer Timer。可以通过将容器属性micrometerEnabled设置为false来禁用计时器。

维护两个计时器-一个用于成功调用侦听器,另一个用于失败。使用简单的MessageListener,每个配置的队列都有一对计时器。

这些计时器名为spring.rabbitmq.listener,具有以下标记:

  • listenerId :(侦听器ID或容器Bean名称)

  • queue :(当consumerBatchEnabled为true时,一个简单侦听器的队列名称或已配置队列名称的列表-因为批处理可能包含来自多个队列的消息)

  • 结果:成功或失败

  • 异常:无或ListenerExecutionFailedException

您可以使用micrometerTags容器属性添加其他标签。

3.3.3 容器和broker命名的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public Queue queue() {
return new Queue("", false, true, true);
}

@Bean
public SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf());
container.setQueues(queue());
container.setMessageListener(m -> {
...
});
container.setMissingQueuesFatal(false);
return container;
}

重置连接并建立新连接后,新队列将获得新名称。由于在容器重启和重新声明队列之间存在竞争状态,因此将容器的missingQueuesFatal属性设置为false很重要,因为容器可能最初会尝试重新连接到旧队列。

3.3.4 消息转换器

通常不建议依赖Java序列化。 JSON(JavaScript对象表示法)是一种更通用,更灵活且可跨不同语言和平台移植的替代方法。可以在任何RabbitTemplate实例上配置该转换器,以覆盖其对SimpleMessageConverter默认值的用法。 Jackson2JsonMessageConverter使用com.fasterxml.jackson 2.x库。以下示例配置了Jackson2JsonMessageConverter:

Jackson2JsonMessageConverter默认情况下使用DefaultClassMapper。类型信息将添加到MessageProperties(并从中检索)。如果入站邮件在MessageProperties中不包含类型信息,但是您知道期望的类型,则可以使用defaultType属性配置静态类型,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

根据发送系统添加到标头的类型信息,将入站消息转换为对象。

在1.6之前的版本中,如果不存在类型信息,则转换将失败。从版本1.6开始,如果缺少类型信息,则转换器将使用Jackson的默认值(通常是地图)来转换JSON。

同样,从版本1.6开始,当您使用@RabbitListener批注(在方法上)时,推断的类型信息将添加到MessageProperties。这使转换器可以转换为目标方法的参数类型。仅当存在一个没有注释的参数或带有@Payload注释的单个参数时,这才适用。在分析期间,将忽略Message类型的参数。

默认情况下,推断的类型信息将覆盖发送系统创建的入站TypeId和相关标头。这使接收系统可以自动转换为其他域对象。仅当参数类型是具体的(不是抽象或接口)或来自java.util包时才适用。在所有其他情况下,将使用TypeId和相关标头。在某些情况下,您可能希望覆盖默认行为并始终使用TypeId信息。例如,假设您有一个@RabbitListener,它带有Thing1参数,但消息中包含Thing2,它是Thing1的子类(具体)。推断的类型将不正确。要处理这种情况,请将Jackson2JsonMessageConverter的TypePrecedence属性设置为TYPE_ID,而不是默认的INFERRED。 (该属性实际上位于转换器的DefaultJackson2JavaTypeMapper上,但为方便起见,在转换器上提供了一个setter。)如果注入自定义类型映射器,则应改为在映射器上设置该属性。

从Message转换时,传入的MessageProperties.getContentType()必须与JSON兼容(使用contentType.contains(“ json”)进行检查)。从2.2版开始,如果没有contentType属性,或者它具有默认值application / octet-stream,则假定为application / json。要恢复到以前的行为(返回未转换的字节[]),请将转换器的假定的SupportedContentType属性设置为false。如果不支持内容类型,则发出WARN日志消息“无法转换内容类型为[…]的传入消息”,并且按原样以字节[]形式返回message.getBody()。因此,为了满足消费者方面的Jackson2JsonMessageConverter要求,生产者必须添加contentType消息属性,例如,作为application / json或text / x-json或通过使用Jackson2JsonMessageConverter自动设置标头。下面的清单显示了许多转换器调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@RabbitListener
public void thing1(Thing1 thing1) {...}

@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}

在上述清单的前四种情况下,转换器尝试转换为Thing1类型。第五个示例无效,因为我们无法确定哪个参数应接收消息有效负载。在第六个示例中,由于通用类型为通配符类型,因此应用了Jackson的默认值。

但是,您可以创建一个自定义转换器,并使用targetMethod消息属性来确定将JSON转换为哪种类型。

仅当在方法级别声明@RabbitListener批注时,才能实现此类型推断。对于类级别的@RabbitListener,转换后的类型用于选择要调用的@RabbitHandler方法。因此,基础结构提供了targetObject消息属性,您可以在自定义转换器中使用该属性来确定类型。

从1.6.11版本开始,Jackson2JsonMessageConverter以及DefaultJackson2JavaTypeMapper(DefaultClassMapper)提供了TrustedPackages选项来克服序列化小工具漏洞。默认情况下,为了实现向后兼容,Jackson2JsonMessageConverter信任所有软件包,即使用*作为选项。

3.3.5.1 使用Spring Data 映射接口

从2.2版开始,您可以将JSON转换为Spring Data Projection接口,而不是具体类型。这允许非常选择性和低耦合的数据绑定,包括从JSON文档中的多个位置查找值。例如,可以将以下接口定义为消息有效负载类型:

1
2
3
4
5
6
interface SomeSample {

@JsonPath({ "$.username", "$.user.name" })
String getUsername();

}
1
2
3
4
5
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}

默认情况下,访问器方法将用于在接收到的JSON文档中查找属性名称作为字段。 @JsonPath表达式允许自定义值查找,甚至定义多个JSON路径表达式,以便从多个位置查找值,直到表达式返回实际值为止。

要启用此功能,请在消息转换器上将useProjectionForInterfaces设置为true。您还必须将spring-data:spring-data-commons和com.jayway.jsonpath:json-path添加到类路径。

当用作@RabbitListener方法的参数时,接口类型将像往常一样自动传递给转换器。

3.3.5.2 使用RabbitTemplate从消息转换

如前所述,类型信息在消息头中传送,以帮助从消息转换时的转换器。在大多数情况下,这可以正常工作。但是,使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表,数组和映射)。从2.0版开始,Jackson2JsonMessageConverter实现了SmartMessageConverter,它可以与带有ParameterizedTypeReference参数的新RabbitTemplate方法一起使用。这允许转换复杂的泛型类型,如以下示例所示:

1
2
Thing1<Thing2<Cat, Hat>> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });

3.3.5.3 MarshallingMessageConverter

另一个选项是MarshallingMessageConverter。它委托Spring OXM库实现Marshaller和Unmarshaller策略接口的实现。您可以在此处阅读有关该库的更多信息。就配置而言,最常见的是仅提供构造函数参数,因为Marshaller的大多数实现也实现了Unmarshaller。以下示例显示如何配置MarshallingMessageConverter:

1
2
3
4
5
6
7
8
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>

3.3.5.4 Jackson2XmlMessageConverter

此类在版本2.1中引入,可用于在XML之间来回转换消息。

Jackson2XmlMessageConverter和Jackson2JsonMessageConverter具有相同的基类:AbstractJackson2MessageConverter。

引入AbstractJackson2MessageConverter类以替换已删除的类:AbstractJsonMessageConverter。

Jackson2XmlMessageConverter使用com.fasterxml.jackson 2.x库。

您可以使用它与Jackson2JsonMessageConverter相同的方式使用它,但是它支持XML而不是JSON。以下示例配置了Jackson2JsonMessageConverter:

1
2
3
4
5
6
7
8
<bean id="xmlConverterWithDefaultType"
class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>

3.3.5.5 ContentTypeDelegatingMessageConverter

此类在版本1.4.2中引入,并允许基于MessageProperties中的内容类型属性委派到特定的MessageConverter。默认情况下,如果没有contentType属性或没有与配置的转换器匹配的值,它将委派给SimpleMessageConverter。以下示例配置ContentTypeDelegatingMessageConverter:

1
2
3
4
5
6
7
8
9

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>

3.3.5.6 Java 反序列化

从不受信任的来源反序列化Java对象时,可能存在一个漏洞。

如果您接受内容类型为application / x-java-serialized-object的不受信任来源的消息,则应考虑配置允许反序列化哪些包和类。当它配置为隐式或通过配置使用DefaultDeserializer时,这对SimpleMessageConverter和SerializerMessageConverter均适用。

默认情况下,白名单为空,这意味着所有类都将反序列化。

您可以设置模式列表,例如thing1。,thing1.thing2.Cat或.MySafeClass。

顺序检查模式,直到找到匹配项。如果不匹配,则抛出SecurityException。

您可以使用这些转换器上的whiteListPatterns属性设置模式。

3.3.5.7 消息属性转换

MessagePropertiesConverter策略接口用于在Rabbit Client BasicProperties和Spring AMQP MessageProperties之间进行转换。默认实现(DefaultMessagePropertiesConverter)通常可以满足大多数目的,但是您可以根据需要实现自己的实现。当大小不大于1024字节时,默认属性转换器将LongString类型的BasicProperties元素转换为String实例。较大的LongString实例不会转换(请参阅下一段)。可以使用构造函数参数来覆盖此限制。

从1.6版开始,长于长字符串限制(默认值:1024)的标头现在默认情况下由DefaultMessagePropertiesConverter保留为LongString实例。您可以通过getBytes [],toString()或getStream()方法访问内容。

以前,DefaultMessagePropertiesConverter将此类标头“转换”为DataInputStream(实际上它只是引用了LongString实例的DataInputStream)。在输出时,未转换此标头(通过在流上调用toString()转换为String除外,例如java.io.DataInputStream@1d057a39)。

现在,大型输入LongString标头现在也可以在输出中正确“转换”(默认情况下)。

提供了一个新的构造函数,使您可以配置转换器以像以前一样工作。以下清单显示了Javadoc注释和方法声明:

1
2
3
4
5
6
7
8
9
10
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

同样从1.6版开始,已将一个名为correlationIdString的新属性添加到MessageProperties。以前,在RabbitMQ客户端使用的BasicProperties之间来回转换时,执行了不必要的byte [] <→String转换,因为MessageProperties.correlationId是byte [],但是BasicProperties使用String。 (最终,RabbitMQ客户端使用UTF-8将String转换为字节以放入协议消息中)。

为了提供最大的向后兼容性,已将一个名为correlationIdPolicy的新属性添加到DefaultMessagePropertiesConverter。这需要一个DefaultMessagePropertiesConverter.CorrelationIdPolicy枚举参数。默认情况下,它设置为BYTES,它复制了以前的行为。

对于入站消息:

  • STRING:仅映射relatedIdString属性

  • BYTES:仅关联ID属性被映射

  • BOTH:映射两个属性

对于出站消息:

  • STRING:仅映射relatedIdString属性

  • BYTES:仅关联ID属性被映射

  • BOTH:都考虑了这两个属性,其中String属性优先

同样从1.6版开始,入站deliveryMode属性不再映射到MessageProperties.deliveryMode。而是将其映射到MessageProperties.receivedDeliveryMode。此外,入站userId属性不再映射到MessageProperties.userId。而是将其映射到MessageProperties.receivedUserId。如果将同一MessageProperties对象用于出站消息,则这些更改是为了避免这些属性的意外传播。

从2.2版开始,DefaultMessagePropertiesConverter使用getName()而不是toString()转换类型为Class <?>类型的任何自定义headers。这避免了使用应用程序必须从toString()表示形式中解析类名称的麻烦。对于滚动升级,您可能需要更改消费者以了解两种格式,直到所有生产者都升级为止。

3.3.6 修改消息-压缩等

存在许多扩展点。它们使您可以在将消息发送到RabbitMQ之前或在收到消息之后立即对其进行一些处理。

在消息转换器中可以看到,这样的扩展点之一是在AmqpTemplate convertAndReceive操作中,您可以在其中提供MessagePostProcessor。例如,在转换POJO之后,MessagePostProcessor允许您在Message上设置自定义标头或属性。

从版本1.4.2开始,已将其他扩展点添加到RabbitTemplate中-setBeforePublishPostPostors()和setAfterReceivePostProcessors()。第一个使后处理器能够在发送到RabbitMQ之前立即运行。使用批处理(请参阅批处理)时,将在组装批处理之后以及发送批处理之前调用此方法。收到消息后立即调用第二个。

这些扩展点用于压缩等功能,并为此提供了多个MessagePostProcessor实现。 GZipPostProcessor,ZipPostProcessor和DeflaterPostProcessor在发送前先压缩消息,而GUnzipPostProcessor,UnzipPostProcessor和InflaterPostProcessor则对收到的消息进行解压缩。

从版本2.1.5开始,可以使用copyProperties = true选项配置GZipPostProcessor,以复制原始消息属性。默认情况下,出于性能原因,将重用这些属性,并使用压缩内容编码和可选的MessageProperties.SPRING_AUTO_DECOMPRESS header对其进行修改。如果您保留对原始出站消息的引用,则其属性也会更改。因此,如果您的应用程序使用这些消息后处理器保留了出站消息的副本,请考虑打开copyProperties选项。
类似地,SimpleMessageListenerContainer也具有setAfterReceivePostProcessors()方法,该方法允许在容器接收到消息之后执行解压缩。

从版本2.1.4开始,将addBeforePublishPostPostors()和addAfterReceivePostProcessors()添加到RabbitTemplate中,以允许将新的后处理器分别追加到发布前和接收后处理器的列表中。还提供了删除后处理器的方法。同样,AbstractMessageListenerContainer还添加了addAfterReceivePostProcessors()和removeAfterReceivePostProcessor()方法。有关更多详细信息,请参见RabbitTemplate和AbstractMessageListenerContainer的Javadoc。

3.3.7 请求和回复消息

AmqpTemplate还提供了各种sendAndReceive方法,它们接受与前面针对单向发送操作(交换,routingKey和Message)所述的相同参数选项。这些方法对于请求-答复方案非常有用,因为它们在发送之前处理必需的答复属性的配置,并且可以在为此目的内部创建的排他队列上侦听答复消息。

将MessageConverter应用于请求和答复时,也可以使用类似的请求-答复方法。这些方法被称为convertSendAndReceive。有关更多详细信息,请参见AmqpTemplate的Javadoc。

从1.5.0版开始,每个sendAndReceive方法变体都有一个采用CorrelationData的重载版本。与正确配置的连接工厂一起,这可以使发布者收到操作的发送方确认。有关更多信息,请参见发布者确认和退货以及RabbitOperations的Javadoc。

从2.0版开始,这些方法有一些变体(convertSendAndReceiveAsType),它们带有一个附加的ParameterizedTypeReference参数来转换复杂的返回类型。模板必须配置有SmartMessageConverter。有关更多信息,请参见使用RabbitTemplate从消息转换。

从2.1版开始,您可以使用noLocalReplyConsumer选项配置RabbitTemplate来控制用于回复消费者的noLocal标志。默认情况下为false。

3.3.7.1 回复超时

默认情况下,发送和接收方法在五秒钟后超时并返回null。您可以通过设置replyTimeout属性来修改此行为。从版本1.5开始,如果将强制属性设置为true(或对于特定消息,则强制表达式的计算结果为true),如果无法将消息传递到队列,则会引发AmqpMessageReturnedException。此异常具有returnMessage,replyCode和replyText属性,以及用于发送的exchange和routingKey。

此功能使用发布者的退货。您可以通过在CachingConnectionFactory上将PublisherReturns设置为true来启用它(请参阅发布者确认和返回)。另外,您一定不能在RabbitTemplate中注册自己的ReturnCallback。
从版本2.1.2开始,添加了replyTimedOut方法,使超时可以通知子类,以便它们可以清除任何保留的状态。

从2.0.11和2.1.3版本开始,使用默认的DirectReplyToMessageListenerContainer时,可以通过设置模板的replyErrorHandler属性来添加错误处理程序。对于任何失败的传递,都会调用此错误处理程序,例如延迟答复和收到的没有相关标头的消息。传入的异常是ListenerExecutionFailedException,它具有failMessage属性。

3.3.7.2 RabbitMQ直接回复

从3.4.0版本开始,RabbitMQ服务器支持直接回复。这消除了固定答复队列的主要原因(以避免为每个请求创建临时队列)。从Spring AMQP版本1.4.1开始,默认情况下使用直接回复(如果服务器支持),而不是创建临时回复队列。如果没有提供replyQueue(或者设置了名为amq.rabbitmq.reply-to的名称),RabbitTemplate会自动检测是否支持直接回复,然后使用直接回复还是使用临时回复队列。使用直接答复时,不需要答复侦听器,并且不应对其进行配置。
命名队列(amq.rabbitmq.reply-to除外)仍支持回复侦听器,从而允许控制回复并发等等。

从1.6版开始,如果您希望为每个答复使用一个临时的,排他的,自动删除队列,请将useTemporaryReplyQueues属性设置为true。如果设置了ReplyAddress,则将忽略此属性。

您可以通过将RabbitTemplate子类化并覆盖useDirectReplyTo()来检查其他条件,从而更改指示是否使用直接答复的条件。发送第一个请求时,该方法仅被调用一次。

在2.0版之前,RabbitTemplate为每个请求创建一个新消费者,并在收到答复(或超时)时取消该消费者。现在,该模板改为使用DirectReplyToMessageListenerContainer,让消费者重新使用。模板仍然负责将回复相关联,因此没有延迟回复发送给其他发件人的危险。如果要还原为以前的行为,请将useDirectReplyToContainer(使用XML配置时直接回复到容器)属性设置为false。

AsyncRabbitTemplate没有此类选项。使用直接回复时,它始终使用DirectReplyToContainer进行回复。

3.3.7.3 消息与回复队列的关联

使用固定答复队列(amq.rabbitmq.reply-to除外)时,必须提供相关数据,以便将答复与请求相关。请参阅RabbitMQ远程过程调用(RPC)。默认情况下,标准的correlationId属性用于保存相关数据。但是,如果希望使用自定义属性来保存关联数据,则可以在上设置correlation-key属性。将该属性显式设置为correlationId与省略该属性相同。客户端和服务器必须将相同的标头用于关联数据。

Spring AMQP 1.1版对此数据使用了一个名为spring_reply_correlation的自定义属性。如果您希望使用当前版本恢复到此行为(可能是为了保持与使用1.1的其他应用程序的兼容性),则必须将属性设置为spring_reply_correlation。
默认情况下,模板会生成自己的关联ID(忽略任何用户提供的值)。如果您希望使用自己的关联ID,请将RabbitTemplate实例的userCorrelationId属性设置为true。

相关ID必须唯一,以避免针对请求返回错误回复的可能性。

3.3.7.4 回复侦听容器

当使用3.4.0之前的RabbitMQ版本时,每个答复都会使用一个新的临时队列。但是,可以在模板上配置单个答复队列,这样可以提高效率,还可以在该队列上设置参数。但是,在这种情况下,您还必须提供一个子元素。此元素为答复队列提供一个侦听器容器,其中模板为侦听器。元素上允许上允许的所有消息侦听器容器配置属性,但从模板的配置继承的connection-factory和message-converter除外。

如果您运行应用程序的多个实例或使用多个RabbitTemplate实例,则必须为每个实例使用唯一的答复队列。 RabbitMQ无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都将争夺答复,而不一定会收到自己的消息。

1
2
3
4
5
6
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>

容器和模板共享连接工厂时,它们不共享通道。因此,请求和答复不在同一事务中执行(如果是事务性的)。
在1.5.0之前的版本中,reply-address属性不可用。始终使用默认交换和答复队列名称作为路由关键字来路由答复。这仍然是默认设置,但是您现在可以指定新的reply-address属性。回复地址可以包含格式为 / 的地址,并且回复被路由到指定的交换机,并被路由到与路由键绑定的队列。回复地址优先于回复队列。当仅使用回复地址时,必须将配置为单独的组件。回复地址和回复队列(或上的queues属性)必须在逻辑上引用同一队列。

通过这种配置,SimpleListenerContainer用于接收答复,其中RabbitTemplate为MessageListener。如上例所示,当使用命名空间元素定义模板时,解析器将模板中的容器和连线定义为侦听器。

当模板不使用固定的ReplyQueue时(或使用直接答复者-参见RabbitMQ直接答复者),则不需要侦听器容器。使用RabbitMQ 3.4.0或更高版本时,首选直接答复机制。

如果将RabbitTemplate定义为或使用@Configuration类将其定义为@Bean,或者在以编程方式创建模板时,则需要自己定义并连接答复侦听器容器。如果您无法执行此操作,则模板将永远不会收到答复,并最终超时并返回null作为对sendAndReceive方法的调用的答复。

从1.5版开始,RabbitTemplate会检测它是否已配置为MessageListener来接收回复。否则,尝试发送和接收带有回复地址的消息失败,并出现IllegalStateException(因为从未收到回复)。

此外,如果使用简单的ReplyAddress(队列名称),则回复侦听器容器将验证它正在侦听具有相同名称的队列。如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法执行此检查。

自己连接回复侦听器和模板时,务必确保模板的ReplyAddress和容器的队列(或queueNames)属性引用相同的队列。该模板将回复地址插入到出站邮件的ReplyTo属性中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}

@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}

在此测试用例中,显示了一个RabbitTemplate的完整示例,该示例连接了固定的答复队列以及处理请求并返回答复的“远程”侦听器容器。

当回复超时(replyTimeout)时,sendAndReceive()方法将返回null。

在1.3.6版之前,仅记录对超时消息的最新答复。现在,如果收到延迟答复,则拒绝该答复(模板将引发AmqpRejectAndDontRequeueException)。如果将答复队列配置为将拒绝的邮件发送到死信交换,则可以检索答复以进行以后的分析。为此,请使用与回复队列名称相同的路由键将队列绑定到已配置的死信交换。

有关配置无效字母的更多信息,请参见RabbitMQ无效字母文档。您还可以查看FixedReplyQueueDeadLetterTests测试用例的示例。

3.3.7.5 AsyncRabbitTemplate

1.6版引入了AsyncRabbitTemplate。这具有与AmqpTemplate上相似的sendAndReceive(和convertSendAndReceive)方法。但是,它们不是阻塞而是返回一个ListenableFuture。

sendAndReceive方法返回RabbitMessageFuture。 convertSendAndReceive方法返回RabbitConverterFuture。

您可以稍后通过将来调用get()来同步检索结果,也可以注册一个与结果异步调用的回调。下面的清单显示了两种方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

...

ListenableFuture<String> future = this.template.convertSendAndReceive("foo");

// do some more work

String reply = null;
try {
reply = future.get();
}
catch (ExecutionException e) {
...
}

...

}

public void doSomeWorkAndGetResultAsync() {

...

RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.addCallback(new ListenableFutureCallback<String>() {

@Override
public void onSuccess(String result) {
...
}

@Override
public void onFailure(Throwable ex) {
...
}

});

...

}

如果设置了mandatory,并且消息无法传递,则将来会引发ExecutionException,其原因是AmqpMessageReturnedException,该异常封装了返回的消息和有关返回的信息。

如果设置了enableConfirms,则future将具有一个名为Confirm的属性,该属性本身是一个ListenableFuture ,其中true表示成功发布。如果确认未来为假,RabbitFuture具有另一个名为nackCause的属性,其中包含失败的原因(如果有)。

如果在回复后收到确认,则发布者确认将被丢弃,因为回复表示成功发布。
您可以在模板上设置receiveTimeout属性以使答复超时(默认为30000-30秒)。如果发生超时,则将来会通过AmqpReplyTimeoutException完成。

该模板实现SmartLifecycle。在有挂起的答复时停止模板会导致挂起的Future实例被取消。

从2.0版开始,异步模板现在支持直接答复而不是配置的答复队列。要启用此功能,请使用以下构造函数之一:

1
2
3
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

请参阅RabbitMQ直接答复以将直接答复与同步RabbitTemplate一起使用。

2.0版引入了这些方法的变体(convertSendAndReceiveAsType),这些变体采用附加的ParameterizedTypeReference参数来转换复杂的返回类型。您必须使用SmartMessageConverter配置基础RabbitTemplate。有关更多信息,请参见使用RabbitTemplate从消息转换。

3.3.7.6 Spring Remoting with AMQP

Spring框架具有常规的远程处理功能,允许使用各种传输方式的远程过程调用(RPC)。 Spring-AMQP通过客户端上的AmqpProxyFactoryBean和服务器上的AmqpInvokerServiceExporter支持类似的机制。这提供了基于AMQP的RPC。在客户端,如前所述使用RabbitTemplate。在服务器端,调用程序(配置为MessageListener)接收消息,调用配置的服务,并使用入站消息的ReplyTo信息返回答复。

您可以将客户端工厂bean注入任何bean(通过使用其serviceInterface)。然后,客户端可以调用代理上的方法,从而通过AMQP进行远程执行。

对于默认的MessageConverter实例,方法参数和返回值必须是Serializable实例。
在服务器端,AmqpInvokerServiceExporter同时具有AmqpTemplate和MessageConverter属性。当前,未使用模板的MessageConverter。如果需要提供自定义消息转换器,则应通过设置messageConverter属性来提供它。在客户端,您可以向AmqpTemplate添加自定义消息转换器,该转换器通过使用其amqpTemplate属性提供给AmqpProxyFactoryBean。

以下清单显示了示例客户端和服务器配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

<bean id="client"
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<property name="amqpTemplate" ref="template" />
<property name="serviceInterface" value="foo.ServiceInterface" />
</bean>

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"
routing-key="remoting.binding" exchange="remoting.exchange" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />

<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

<bean id="listener"
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface" />
<property name="service" ref="service" />
<property name="amqpTemplate" ref="template" />
</bean>

<bean id="service" class="foo.ServiceImpl" />

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="template" connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />

<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>

AmqpInvokerServiceExporter只能处理格式正确的消息,例如从AmqpProxyFactoryBean发送的消息。如果收到无法解释的消息,则会发送序列化的RuntimeException作为答复。如果邮件没有replyToAddress属性,则如果未配置死信交换,则邮件将被拒绝并永久丢失。

默认情况下,如果无法传递请求消息,则调用线程最终超时,并引发RemoteProxyFailureException。默认情况下,超时为五秒。您可以通过在RabbitTemplate上设置ReplyTimeout属性来修改该持续时间。从版本1.5开始,将强制性属性设置为true并在连接工厂上启用返回(请参见发布者确认和返回),调用线程将引发AmqpMessageReturnedException。有关更多信息,请参见回复超时。

3.3.8 配置Broker

AMQP规范描述了如何使用协议在代理上配置队列,交换和绑定。这些操作(可从0.8规范及更高版本移植)在org.springframework.amqp.core软件包的AmqpAdmin界面中。该类的RabbitMQ实现是RabbitAdmin,位于org.springframework.amqp.rabbit.core包中。

AmqpAdmin界面基于使用Spring AMQP域抽象,并在以下列表中显示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

public interface AmqpAdmin {

// Exchange Operations

void declareExchange(Exchange exchange);

void deleteExchange(String exchangeName);

// Queue Operations

Queue declareQueue();

String declareQueue(Queue queue);

void deleteQueue(String queueName);

void deleteQueue(String queueName, boolean unused, boolean empty);

void purgeQueue(String queueName, boolean noWait);

// Binding Operations

void declareBinding(Binding binding);

void removeBinding(Binding binding);

Properties getQueueProperties(String queueName);

}

getQueueProperties()方法返回有关队列的一些有限信息(消息计数和消费者计数)。返回的属性的键在RabbitTemplate中可以作为常量使用(QUEUE_NAME,QUEUE_MESSAGE_COUNT和QUEUE_CONSUMER_COUNT)。 RabbitMQ REST API在QueueInfo对象中提供了更多信息。

no-argclarifyQueue()方法使用自动生成的名称定义代理上的队列。此自动生成的队列的其他属性是Exclusive = true,autoDelete = true和持久= false。

defineQueue(Queue queue)方法接受一个Queue对象,并返回已声明队列的名称。如果提供的Queue的name属性为空字符串,则代理将使用生成的名称声明队列。该名称将返回给呼叫者。该名称也被添加到队列的actualName属性中。您只能通过直接调用RabbitAdmin来以编程方式使用此功能。在应用程序上下文中以声明方式定义队列时,当管理员使用自动声明时,可以将name属性设置为“”(空字符串)。然后,代理创建名称。从2.1版开始,侦听器容器可以使用这种类型的队列。有关更多信息,请参见以容器和代理命名的队列。

这与AnonymousQueue相反,在AnonymousQueue中,框架生成唯一(UUID)名称,并将持久性设置为false和Exclusive,将autoDelete设置为true。名称属性为空(或缺少)的始终会创建一个AnonymousQueue。

请参阅AnonymousQueue以了解为什么AnonymousQueue优先于代理生成的队列名称,以及如何控制名称格式。从版本2.1开始,默认情况下,匿名队列使用参数x-queue-master-locator声明为本地客户端声明。这样可以确保在与应用程序连接的节点上声明队列。声明性队列必须具有固定的名称,因为它们可能在上下文中的其他地方被引用,例如在以下示例中所示的侦听器中:

1
2
3
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

该接口的RabbitMQ实现是RabbitAdmin,当使用Spring XML进行配置时,它类似于以下示例:

1
2
3
4

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

当CachingConnectionFactory缓存模式为CHANNEL(默认)时,RabbitAdmin实现将自动延迟声明在同一ApplicationContext中声明的队列,交换和绑定。一旦向代理打开连接,便声明这些组件。有一些名称空间功能使此操作非常方便。例如,在Stocks示例应用程序中,我们具有以下功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,在内部,只是具有由框架而不是由代理生成的名称的队列),并通过ID引用它们。我们还可以使用显式名称声明队列,这些队列还用作上下文中其bean定义的标识符。以下示例使用明确的名称配置队列:

1
<rabbit:queue name="stocks.trade.queue"/>

您可以提供id和name属性。这使您可以通过独立于队列名称的ID来引用队列(例如,在绑定中)。它还允许使用标准的Spring功能(例如队列名的属性占位符和SpEL表达式)。当您使用名称作为Bean标识符时,这些功能不可用。

默认情况下,当发生任何异常时,RabbitAdmin将立即停止处理所有声明。这可能会导致下游问题,例如侦听器容器无法初始化,因为未声明另一个队列(在错误的一个之后定义)。

可以通过在RabbitAdmin实例上将ignore-declaration-exceptions属性设置为true来修改此行为。该选项指示RabbitAdmin记录异常并继续声明其他元素。使用Java配置RabbitAdmin时,此属性称为ignoreDeclarationExceptions。这是适用于所有元素的全局设置。队列,交换和绑定具有类似的属性,仅适用于那些元素。

在1.6版之前的版本中,仅当通道上发生IOException时(例如,当前属性与所需属性不匹配时),此属性才生效。现在,此属性对任何异常(包括TimeoutException和其他异常)都生效。

另外,任何声明异常都会导致产生一个DeclarationExceptionEvent,这是一个ApplicationEvent,可以由上下文中的任何ApplicationListener消费。该事件包含对admin,已声明的元素和Throwable的引用。

3.3.8.1 Header 交换器

从版本1.3开始,您可以配置HeadersExchange以匹配多个Header。您也可以指定是否必须匹配任何Header。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>

从版本1.6开始,您可以使用内部标志(默认为false)配置Exchange,并且可以通过RabbitAdmin在Broker上正确配置这样的Exchange(如果应用程序上下文中存在Exchange)。如果内部标志适用于交换,则RabbitMQ不允许客户端使用交换。这对于无用信交换或交换对交换绑定很有用,在这种情况下,您不希望发行人直接使用交换。

要查看如何使用Java配置AMQP基础结构,请查看Stock示例应用程序,其中有@Configuration类AbstractStockRabbitConfiguration,该类又具有RabbitClientConfiguration和RabbitServerConfiguration子类。以下清单显示了AbstractStockRabbitConfiguration的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}

@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}

// additional code omitted for brevity

}

客户端通过AmqpAdmin上的clarifyQueue()方法声明另一个队列。它使用属性文件中外部化的路由模式将该队列绑定到市场数据交换。

3.3.8.2 Builder API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}

@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}

3.3.8.3 声明交换、队列、绑定的集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Configuration
public static class Config {

@Bean
public ConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}

@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}

@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}

@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}

@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}

@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}

@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}

@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}

}

2.2版将getDeclarablesByType方法添加到了Declarables中。例如,在声明一个或多个侦听器容器bean时,这可以方便使用。

1
2
3
4
5
6
7
8
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}

3.3.8.4 条件声明

默认情况下,所有队列,交换和绑定都由应用程序上下文中的所有RabbitAdmin实例声明(假设它们具有auto-startup =“ true”)。

从版本2.1.9开始,RabbitAdmin具有新的属性explicitDeclarationsOnly(默认情况下为false)。当将此设置为true时,管理员将仅声明显式配置为由该RabbitAdmin声明的Bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}

@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}

@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}

3.3.8.5 关于id和name属性的注释

和元素上的name属性反映代理中实体的名称。对于队列,如果省略名称,则会创建一个匿名队列(请参阅AnonymousQueue)。

在2.0之前的版本中,该名称也被注册为Bean名称别名(类似于元素上的名称)。

这导致了两个问题:

  • 它阻止了队列的声明和相同名称的交换。

  • 如果别名包含SpEL表达式(#{…}),则无法解析。

从2.0版开始,如果您同时使用id和name属性声明这些元素之一,则该名称不再声明为Bean名称别名。如果要声明队列并使用相同的名称进行交换,则必须提供一个ID。

如果元素只有name属性,则没有任何变化。仍然可以使用名称Bean来引用该Bean,例如在绑定声明中。但是,如果名称包含SpEL,您仍然无法引用它-您必须提供ID以供参考。

3.3.8.6 AnonymousQueue

通常,当您需要一个唯一命名的排他性自动删除队列时,建议您使用AnonymousQueue而不是broker定义的队列名称(使用“”作为队列名称会导致代理生成队列名称)。

这是因为:

建立与broker的连接时,实际上会声明队列。在创建和将bean连接在一起很长时间之后。使用队列的Bean需要知道其名称。实际上,启动应用程序时,broker甚至可能没有运行。

如果与broker的连接由于某种原因而丢失,则管理员将用相同的名称重新声明AnonymousQueue。如果使用broker声明的队列,则队列名称将更改。

您可以控制AnonymousQueue实例使用的队列名称的格式。

默认情况下,队列名称以spring.gen-为前缀,后跟UUID的base64表示形式,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。

您可以在构造函数参数中提供AnonymousQueue.NamingStrategy实现。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public Queue anon1() {
return new AnonymousQueue();
}

@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个bean生成一个队列名称,该队列名称以spring.gen-为前缀,后跟UUID的base64表示形式-例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个bean生成一个以某些东西为前缀的队列名称,后跟UUID的base64表示形式。第三个bean仅通过使用UUID(不进行base64转换)来生成名称,例如f20c818a-006b-4416-bf91-643590fedb0e。

base64编码使用RFC 4648中的“ ​​URL和文件名安全字母”。尾随的填充字符(=)被删除。

您可以提供自己的命名策略,从而可以在队列名称中包括其他信息(例如应用程序名称或客户端主机)。

使用XML配置时,可以指定命名策略。 naming-strategy属性存在于实现AnonymousQueue.NamingStrategy的bean引用的元素上。以下示例显示如何以各种方式指定命名策略:

1
2
3
4
5
6
7
8
9
10
11
<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>

第一个示例创建名称,例如spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个示例使用UUID的字符串表示形式创建名称。第三个示例创建名称,例如custom.gen-MRBv9sqISkuCiPfOYfpo4g。

您还可以提供自己的命名策略bean。

从版本2.1开始,默认情况下,匿名队列使用参数x-queue-master-locator声明为本地客户端声明。这样可以确保在与应用程序连接的节点上声明队列。构造实例后,可以通过调用queue.setMasterLocator(null)恢复到以前的行为。

3.3.9 Broker事件监听

启用事件交换插件后,如果将类型为BrokerEventListener的bean添加到应用程序上下文中,它将发布选定的代理事件作为BrokerEvent实例,可以通过常规的Spring ApplicationListener或@EventListener方法使用它。代理将事件发布到主题交换amq.rabbitmq.event,每种事件类型具有不同的路由键。侦听器使用事件密钥,该事件密钥用于将AnonymousQueue绑定到交换,因此侦听器仅接收选定的事件。由于这是一个主题交换,因此可以使用通配符(以及显式请求特定事件),如以下示例所示:

1
2
3
4
@Bean
public BrokerEventListener eventListener() {
return new BrokerEventListener(connectionFactory(), "user.deleted", "channel.#", "queue.#");
}

您可以使用正常的Spring技术进一步缩小单个事件侦听器中接收到的事件的范围,如以下示例所示:

1
2
3
4
@EventListener(condition = "event.eventType == 'queue.created'")
public void listener(BrokerEvent event) {
...
}

3.3.10 延迟消息交换器

该插件目前被标记为实验性,但已经可用了一年多的时间(在撰写本文时)。如果需要更改插件,我们计划在可行的情况下尽快添加对此类更改的支持。因此,Spring AMQP中的这种支持也应视为试验性的。此功能已通过RabbitMQ 3.6.0和插件的0.0.1版进行了测试。

要使用RabbitAdmin将交换声明为延迟,可以将交换bean的delay属性设置为true。 RabbitAdmin使用交换类型(直接,扇出等)来设置x-delayed-type参数,并使用x-delayed-message类型声明交换。

使用XML配置交换bean时,delayed属性(默认值:false)也可用。以下示例显示了如何使用它:

1
<rabbit:topic-exchange name="topic" delayed="true" />

要发送延迟的消息,可以通过MessageProperties设置x-delay标头,如以下示例所示:

1
2
3
4
MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
1
2
3
4
5
6
7
8
9
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(15000);
return message;
}

});

若要检查消息是否延迟,请在MessageProperties上使用getReceivedDelay()方法。这是一个单独的属性,可以避免意外传播到由输入消息生成的输出消息。

3.3.11 RabbitMQ Rest API

启用管理插件后,RabbitMQ服务器将公开REST API来监视和配置代理。现在提供了API的Java绑定。 com.rabbitmq.http.client.Client是一个标准的,即时的并且因此是阻塞的API。它基于Spring Web模块及其RestTemplate实现。另一方面,com.rabbitmq.http.client.ReactorNettyClient是基于Reactor Netty项目的反应性,非阻塞实现。

跳相关性(com.rabbitmq:http-client)现在也是可选的。

有关更多信息,请参见其Javadoc。

3.3.12 异常处理

RabbitMQ Java客户端的许多操作都可能引发已检查的异常。例如,在很多情况下,可能会抛出IOException实例。 RabbitTemplate,SimpleMessageListenerContainer和其他Spring AMQP组件捕获这些异常,并将它们转换为AmqpException层次结构中的异常之一。这些在“ org.springframework.amqp”包中定义,AmqpException是层次结构的基础。

当侦听器引发异常时,它将包装在ListenerExecutionFailedException中。通常,消息被代理拒绝并重新排队。将defaultRequeueRejected设置为false会导致消息被丢弃(或路由到无效信件交换)。如消息侦听器和异步案例中所述,侦听器可以引发AmqpRejectAndDontRequeueException(或InstantRequeueAmqpException)来有条件地控制此行为。

但是,存在一类错误,侦听器无法控制该行为。当遇到无法转换的消息(例如,无效的content_encoding标头)时,在消息到达用户代码之前会引发一些异常。将defaultRequeueRejected设置为true(默认值)(或引发InstantRequeueAmqpException),此类消息将一遍又一遍地传递。在版本1.3.2之前,用户需要编写自定义ErrorHandler(如异常处理中所述),以避免这种情况。

从1.3.2版开始,默认的ErrorHandler现在是ConditionalRejectingErrorHandler,它拒绝(并且不重新排队)由于不可恢复的错误而失败的消息。具体来说,它拒绝失败并出现以下错误的消息:

o.s.amqp … MessageConversionException:使用MessageConverter转换传入消息有效负载时可以抛出此异常。

o.s.messaging … MessageConversionException:如果映射到@RabbitListener方法时需要额外的转换,则转换服务可以抛出该异常。

o.s.messaging … MethodArgumentNotValidException:如果在侦听器中使用了验证(例如@Valid),则可以引发此异常。

o.s.messaging … MethodArgumentTypeMismatchException:如果将入站消息转换为与目标方法不正确的类型,则可以引发该异常。例如,将该参数声明为Message ,但收到Message 。

java.lang.NoSuchMethodException:在版本1.6.3中添加。

java.lang.ClassCastException:在版本1.6.3中添加。

您可以使用FatalExceptionStrategy配置此错误处理程序的实例,以便用户可以提供自己的条件消息拒绝规则-例如,从Spring Retry(消息侦听器和异步案例)到BinaryExceptionClassifier的委托实现。另外,ListenerExecutionFailedException现在具有您可以在决策中使用的failedMessage属性。如果FatalExceptionStrategy.isFatal()方法返回true,则错误处理程序将引发AmqpRejectAndDontRequeueException。当确定异常为致命时,默认的FatalExceptionStrategy会记录一条警告消息。

从1.6.3版开始,将用户异常添加到致命列表的便捷方法是子类ConditionalRejectingErrorHandler.DefaultExceptionStrategy并重写isUserCauseFatal(Throwable cause)方法以为致命异常返回true。

处理DLQ消息的常见模式是在这些消息以及其他DLQ配置上设置生存时间,以使这些消息过期并路由回主队列以重试。这种技术的问题在于,导致致命异常的消息会永远循环。从2.1版开始,ConditionalRejectingErrorHandler在消息上检测到x-death标头,该标头导致引发致命异常。该消息已记录并被丢弃。您可以通过将ConditionalRejectingErrorHandler上的rejectFatalsWithXDeath属性设置为false来还原为以前的行为。

从版本2.1.9开始,即使容器确认模式为MANUAL,默认情况下,具有这些致命异常的消息也将被拒绝并且不重新排队。这些异常通常在调用侦听器之前发生,因此侦听器没有机会确认或拒绝消息,因此消息以未确认状态保留在队列中。若要还原为以前的行为,请将ConditionalRejectingErrorHandler的rejectManual属性设置为false。

3.3.13 事务

Spring Rabbit框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,可以通过声明方式选择这些语义,这是Spring事务的现有用户所熟悉的。这使许多(如果不是大多数)常见的消息传递模式易于实现。

有两种方法可以向框架发出所需的事务语义。在RabbitTemplate和SimpleMessageListenerContainer中,都有一个标志channelTransacted,如果为true,则告诉框架使用事务性通道并以提交或回滚(取决于结果)结束所有操作(发送或接收),但有例外表示回滚。另一个信号是使用Spring的PlatformTransactionManager实现之一提供外部事务,作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有事务在进行中,并且channelTransacted标志为true,则将消息事务的提交或回滚推迟到当前事务结束为止。如果channelTransacted标志为false,则没有事务语义适用于消息传递操作(它是自动确认的)。

channelTransacted标志是配置时间设置。创建AMQP组件时,通常在应用程序启动时声明和处理一次。原则上,外部事务是动态的,因为系统在运行时响应当前线程状态。但是,实际上,当将事务以声明方式分层到应用程序时,它通常也是配置设置。

对于RabbitTemplate的同步用例,外部事务由调用方根据喜好以声明性或强制性方式提供(通常的Spring事务模型)。以下示例显示了一种声明性方法(通常首选,因为它是非侵入性的),其中已使用channelTransacted = true配置模板:

1
2
3
4
5
6
7
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,在标记为@Transactional的方法内,String有效负载作为消息正文被接收,转换和发送。如果数据库处理失败并发生异常,则将传入消息返回给代理,并且不发送传出消息。这适用于在交易方法链内使用RabbitTemplate进行的任何操作(例如,除非直接操纵Channel尽早提交交易)。

对于带有SimpleMessageListenerContainer的异步用例,如果需要外部事务,则容器在设置侦听器时必须请求它。为了表明需要外部事务,在配置容器时,用户向容器提供PlatformTransactionManager的实现。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

}

在前面的示例中,添加了事务管理器作为从另一个bean定义(未显示)注入的依赖项,并且channelTransacted标志也设置为true。效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也将返回给代理。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接性问题),则AMQP事务也会回滚,并将消息返回给代理。这有时被称为“尽力而为第一阶段提交”,并且是可靠消息传递的非常强大的模式。如果在前面的示例中将channelTransacted标志设置为false(默认值),则仍将为侦听器提供外部事务,但是所有消息传递操作将被自动确认,因此效果是即使在服务器上也提交消息传递操作。业务运营的回滚。

3.3.13.1 条件退回

在1.6.6版之前,使用外部事务管理器(例如JDBC)在容器的transactionAttribute中添加回滚规则无效。异常总是回滚事务。

另外,在容器的建议链中使用事务建议时,条件回滚并不是很有用,因为所有侦听器异常都包装在ListenerExecutionFailedException中。

第一个问题已得到纠正,现在可以正确应用规则。此外,现在提供了ListenerFailedRuleBasedTransactionAttribute。它是RuleBasedTransactionAttribute的子类,唯一的区别是它知道ListenerExecutionFailedException并将该异常的原因用于规则。该交易属性可以直接在容器中使用,也可以通过交易建议使用。

以下示例使用此规则:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}

3.3.13.2 关于回滚收到消息的注意事项

AMQP事务仅适用于发送给代理的消息和确认。因此,当发生Spring事务的回滚并且已经收到消息时,Spring AMQP不仅必须回滚该事务,而且还必须手动拒绝该消息(有点小问题,但这不是规范所称的)。拒绝消息时采取的操作与事务无关,并且取决于defaultRequeueRejected属性(默认值:true)。有关拒绝失败消息的更多信息,请参见消息侦听器和异步案例。

有关RabbitMQ事务及其限制的更多信息,请参见RabbitMQ Broker Semantics。

在RabbitMQ 2.7.0之前,此类消息(以及通道关闭或中止时未确认的消息)在Rabbit代理上进入队列的后面。从2.7.0版本开始,被拒绝的消息以与JMS回滚消息类似的方式进入队列的最前面。
以前,本地事务之间和提供TransactionManager时,事务回滚时的消息重新排队不一致。在前一种情况下,将应用常规的重新排队逻辑(AmqpRejectAndDontRequeueException或defaultRequeueRejected = false)(请参阅消息侦听器和异步情况)。使用事务管理器,该消息将在回滚时无条件重新排队。从版本2.0开始,行为是一致的,并且在两种情况下均采用常规的重新排队逻辑。要恢复以前的行为,可以将容器的alwaysRequeueWithTxManagerRollback属性设置为true。请参阅消息侦听器容器配置。

3.3.13.3 使用RabbitTransactionManager

RabbitTransactionManager是在外部事务中执行并与外部事务同步的Rabbit操作的替代方法。该事务管理器是PlatformTransactionManager接口的实现,应与单个Rabbit ConnectionFactory一起使用。

此策略不能提供XA事务,例如,以便在消息传递和数据库访问之间共享事务。
需要应用程序代码才能通过ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory,boolean)(而不是带有后续通道创建的标准Connection.createChannel())来检索事务型Rabbit资源。使用Spring AMQP的RabbitTemplate时,它将自动检测线程绑定的Channel并自动参与其事务。

使用Java配置,可以使用以下bean来设置新的RabbitTransactionManager:

1
2
3
4
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}

如果您喜欢XML配置,则可以在XML Application Context文件中声明以下bean:

1
2
3
4
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>

3.3.14 消息监听容器配置

用于配置与事务和服务质量相关的SimpleMessageListenerContainer(SMLC)和DirectMessageListenerContainer(DMLC)的选项很多,它们中的一些相互交互。适用于SMLC或DMLC的属性由相应列中的复选标记指示。请参阅选择容器,以获取帮助您确定哪个容器适合您的应用程序的信息。

下表显示了使用命名空间配置时的容器属性名称及其等效属性名称(在括号中)。该元素上的type属性可以是简单的(默认),也可以直接指定SMLC或DMLC。命名空间未公开某些属性。这些由属性的N / A表示。

3.3.15 监听并发

3.3.15.1 SimpleMessageListenerContainer

默认情况下,侦听器容器启动一个消费者,该消费者从队列中接收消息。

检查上一节中的表时,您可以看到许多控制并发性的属性。最简单的是currentsconsumers,它创建并发处理消息的(固定)数量的消费者。

在1.3.0版之前,这是唯一可用的设置,必须停止容器并再次启动容器才能更改设置。

从1.3.0版开始,您现在可以动态调整parallelConsumers属性。如果在容器运行时更​​改了容器,则会根据需要添加或删除消费者,以适应新设置。

另外,添加了一个名为maxConcurrentConsumers的新属性,并且该容器根据工作负载动态调整了并发性。这与四个附加属性一起工作:ContinuousActiveTrigger,startConsumerMinInterval,continuousIdleTrigger和stopConsumerMinInterval。


使用默认设置,增加使用方的算法如下:

如果尚未达到maxConcurrentConsumers且现有消费者处于连续十个周期的活动状态,并且自启动最后一个消费者以来至少经过了10秒钟,则将启动一个新消费者。如果消费者以batchSize * receiveTimeout毫秒接收至少一条消息,则认为该消费者是活动的。



在默认设置下,减少使用方的算法如下:

如果运行的并发消费者数量超过了,并且消费者检测到十个连续超时(空闲),并且最后一个消费者至少在60秒前停止,则该消费者将停止。超时取决于receiveTimeout和batchSize属性。如果消费者未接收到batchSize * receiveTimeout毫秒中的消息,则被视为空闲。因此,在默认超时(一秒)和batchSize为4的情况下,在40秒的空闲时间后考虑停止消费者(四个超时对应于一个空闲检测)。


实际上,只有在整个容器闲置一段时间后才可以停止使用。这是因为broker在所有活跃的消费者中共享其工作。
每个消费者都使用一个通道,而不管配置的队列数量如何。

从2.0版开始,可以使用并发属性(例如2-4)设置parallelConsumers和maxConcurrentConsumers属性。

3.3.15.2 使用 DirectMessageListenerContainer

使用此容器,并发性基于配置的队列和consumersPerQueue。每个队列的每个消费者都使用一个单独的通道,并发性由Rabbit客户端库控制。默认情况下,在编写本文时,它使用DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors()* 2个线程的池。

您可以配置taskExecutor以提供所需的最大并发性。

3.3.16 排他消费者

从版本1.3开始,您可以使用单个独占消费者配置侦听器容器。这样可以防止其他容器从队列中使用,直到当前使用方被取消为止。这样的容器的并发性必须为1。

3.3.17 监听容器队列

1.3版引入了许多改进,用于处理侦听器容器中的多个队列。

必须将容器配置为侦听至少一个队列。以前也是如此,但是现在可以在运行时添加和删除队列。处理完任何预提取的消息后,容器将回收(取消并重新创建)消费者。有关addQueues,addQueueNames,removeQueues和removeQueueNames方法,请参见Javadoc。删除队列时,必须至少保留一个队列。

现在,如果消费者的任何队列可用,便会启动。以前,如果没有任何队列,容器将停止。现在,只有在没有可用队列的情况下才是这种情况。如果不是所有队列都可用,则容器会尝试每60秒被动声明(并消耗)丢失的队列。

同样,如果消费者从代理收到取消通知(例如,如果删除队列),则该消费者尝试恢复,并且恢复的消费者继续处理来自任何其他已配置队列的消息。以前,取消一个队列会取消整个消费者,最终,由于缺少队列,容器将停止。

如果要永久删除队列,则应在删除队列之前或之后更新容器,以避免将来尝试从中使用该容器。

3.3.18 弹性:从错误和broker失败中恢复

Spring AMQP提供的一些关键(也是最流行的)高级功能与协议错误或代理失败时的恢复和自动重新连接有关。我们已经在本指南中看到了所有相关组件,但是应该有助于将它们放在一起,并分别指出功能和恢复方案。

主要重新连接功能由CachingConnectionFactory本身启用。使用RabbitAdmin自动声明功能通常也很有益。另外,如果您关心保证传递,则可能还需要使用RabbitMessage和SimpleMessageListenerContainer中的channelTransacted标志以及SimpleMessageListenerContainer中的AcknowledgeMode.AUTO(如果您自己进行确认,则使用手册)。

3.3.18.1 自动声明交换,队列和绑定

RabbitAdmin组件可以在启动时声明交换,队列和绑定。它通过ConnectionListener懒惰地执行此操作。因此,如果broker在启动时不存在,则没有关系。第一次使用Connection(例如,通过发送消息)时,将触发侦听器并应用管理功能。在侦听器中执行自动声明的另一个好处是,如果由于任何原因(例如,代理死亡,网络故障等)而断开连接,则在重新建立连接时会再次应用它们。

以这种方式声明的队列必须具有固定的名称-由AnonymousQueue实例的框架明确声明或生成。匿名队列是非持久的,排他的和自动删除的。
仅当CachingConnectionFactory缓存模式为CHANNEL(默认)时,才执行自动声明。存在此限制是因为排他队列和自动删除队列绑定到该连接。
从版本2.2.2开始,RabbitAdmin将检测类型为DeclarationCustomizer的bean,并在实际处理声明之前应用该函数。例如,这对于在框架内具有一流支持之前设置新参数(属性)很有用。

1
2
3
4
5
6
7
8
9
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}

在不直接访问Declarable bean定义的项目中,它也很有用。

3.3.18.2 同步操作失败和重试选项

如果您在使用RabbitTemplate时(例如)在同步序列中失去了与代理的连接,则Spring AMQP会引发AmqpException(通常但并非总是AmqpIOException)。我们不会试图掩盖存在问题的事实,因此您必须能够捕获并响应异常。如果您怀疑连接丢失(不是您的错),最简单的方法是再次尝试该操作。您可以手动执行此操作,也可以查看使用Spring Retry处理(命令式或声明式)重试。

Spring Retry提供了几个AOP拦截器,并提供了很大的灵活性来指定重试的参数(尝试次数,异常类型,退避算法等)。 Spring AMQP还为AMQP用例提供了一些方便的工厂bean,以方便的形式创建Spring Retry拦截器,并提供了可用于实现自定义恢复逻辑的强类型回调接口。有关更多详细信息,请参见Javadoc和StatefulRetryOperationsInterceptor和StatelessRetryOperationsInterceptor的属性。如果没有事务或在重试回调中启动了事务,则无状态重试是合适的。请注意,无状态重试比有状态重试更易于配置和分析,但是如果存在正在进行的事务必须回滚或肯定要回滚,则通常不适合使用。在事务中间断开连接应具有与回滚相同的效果。因此,对于在事务从堆栈开始的更高位置进行重新连接而言,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识消息。最简单的方法是让发件人在MessageId消息属性中放置一个唯一值。提供的消息转换器提供了执行此操作的选项:您可以将createMessageIds设置为true。否则,您可以将MessageKeyGenerator实现注入到拦截器中。密钥生成器必须为每个消息返回唯一的密钥。在2.0版之前的版本中,提供了MissingMessageIdAdvice。它使没有messageId属性的消息仅被重试一次(忽略重试设置)。不再提供此建议,因为与spring-retry 1.2版一起,其功能已内置在拦截器和消息侦听器容器中。

为了向后兼容,默认情况下(在重试一次之后),具有空消息ID的消息被认为对消费者是致命的(消费者已停止)。若要复制MissingMessageIdAdvice提供的功能,可以在侦听器容器上将statefulRetryFatalWithNullMessageId属性设置为false。使用该设置,消费者继续运行,并且邮件被拒绝(在重试一次之后)。它被丢弃或路由到死信队列(如果已配置)。
从1.3版开始,提供了一个构建器API,以帮助使用Java(在@Configuration类中)组装这些拦截器。以下示例显示了如何执行此操作:

1
2
3
4
5
6
7
8

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}

只能以这种方式配置重试功能的子集。更多高级功能将需要将RetryTemplate配置为Spring bean。有关可用策略及其配置的完整信息,请参见Spring Retry Javadoc。

3.3.18.3 使用批处理侦听器重试

不建议使用批处理侦听器配置重试,除非该批处理由生产者在单个记录中创建。有关消费者和生产者创建的批次的信息,请参阅批次消息。对于用户创建的批处理,框架不知道该批处理中的哪个消息导致了故障,因此无法在重试用完后进行恢复。对于生产者创建的批次,由于只有一条消息实际上是失败的,因此可以恢复整个消息。应用程序可能希望通过设置抛出的异常的index属性来通知自定义恢复程序在批次中发生故障的位置。

批处理侦听器的重试恢复器必须实现MessageBatchRecoverer。

3.3.18.4 消息侦听器和异步情况

如果MessageListener因业务异常而失败,则由消息侦听器容器处理该异常,然后该容器将返回侦听另一条消息。如果故障是由断开的连接(不是业务异常)引起的,则必须取消并重新启动为侦听器收集消息的消费者。 SimpleMessageListenerContainer无缝处理此问题,并留下一条日志说正在重新启动侦听器。实际上,它无限循环,试图重新启动消费者。只有当消费者表现得很糟糕时,它才会放弃。副作用是,如果在容器启动时代理关闭,则它将继续尝试直到可以建立连接为止。

与协议错误和连接断开相反,业务异常处理可能需要更多考虑和一些自定义配置,尤其是在使用事务或容器确认的情况下。在2.8.x之前,RabbitMQ没有死信行为的定义。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限地重新发送。为了限制客户端的重传次数,一种选择是侦听器建议链中的StatefulRetryOperationsInterceptor。拦截器可以具有实现自定义死信操作的恢复回调-适用于您的特定环境的任何回调。

另一种选择是将容器的defaultRequeueRejected属性设置为false。这将导致所有失败的消息被丢弃。当使用RabbitMQ 2.8.x或更高版本时,这也有助于将消息传递给死信交换。

或者,可以引发AmqpRejectAndDontRequeueException。这样做可以防止消息重新排队,而不管defaultRequeueRejected属性的设置如何。

从2.1版开始,引入了InstantRequeueAmqpException以执行完全相反的逻辑:无​​论defaultRequeueRejected属性如何设置,都会重新排队消息。

通常,将两种技术结合使用。您可以将通知链中的StatefulRetryOperationsInterceptor与抛出AmqpRejectAndDontRequeueException的MessageRecoverer一起使用。当所有重试用尽后,将调用MessageRecover。 RejectAndDontRequeueRecoverer正是这样做的。默认的MessageRecoverer使用错误的消息并发出WARN消息。

从1.3版开始,提供了新的RepublishMessageRecoverer,以允许在重试用尽后发布失败的消息。

当恢复者使用了最后的例外时,该消息将被确认,并且不会发送到死信交换处(如果有的话)。

在消费者方使用RepublishMessageRecoverer时,接收到的消息在receiveDeliveryMode消息属性中具有deliveryMode。在这种情况下,deliveryMode为null。这意味着代理上的NON_PERSISTENT交付模式。从2.0版开始,您可以为deliveryMode配置RepublishMessageRecoverer,以将其设置为要重新发布的消息(如果为null)。默认情况下,它使用MessageProperties的默认值-MessageDeliveryMode.PERSISTENT。
以下示例显示如何将RepublishMessageRecoverer设置为恢复器:

1
2
3
4
5
6
7
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}

RepublishMessageRecoverer在消息头中发布消息以及其他信息,例如异常消息,堆栈跟踪,原始交换和路由键。可以通过创建子类并覆盖AdditionalHeaders()来添加其他标头。也可以在additionalHeaders()中更改deliveryMode(或任何其他属性),如以下示例所示:

1
2
3
4
5
6
7
8
9
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}

};

从2.0.5版开始,如果堆栈跟踪太大,则可能会被截断;这是因为所有标头都必须放在一个框架中。默认情况下,如果堆栈跟踪将导致少于20,000字节(“余量”)可用于其他标头,则它将被截断。如果您需要更多或更少的空间放置其他标头,则可以通过设置恢复程序的frameMaxHeadroom属性进行调整。从版本2.1.13、2.2.3开始,此计算中包括异常消息,并且使用以下算法将最大化堆栈跟踪量:

如果仅堆栈跟踪将超出限制,则异常消息标头将被截断为97个字节加……且堆栈跟踪也将被截断。

如果堆栈跟踪很小,则消息将被截断(加……)以适合可用字节(但是堆栈跟踪本身内的消息将被截断为97字节加…)。

每当发生任何截断时,都会记录原始异常以保留完整的信息。

从2.1版开始,添加了InstantRequeueMessageRecoverer以引发InstantRequeueAmqpException,该异常通知侦听器容器重新排队当前失败的消息。

3.3.18.5 Spring重试的异常分类

Spring Retry在确定哪些异常可以调用重试方面具有很大的灵活性。默认配置将重试所有异常。鉴于用户异常被包装在ListenerExecutionFailedException中,我们需要确保分类检查异常原因。默认分类器仅查看顶级异常。

从Spring Retry 1.0.3开始,BinaryExceptionClassifier具有一个名为traverseCauses的属性(默认值:false)。为true时,它将遍历异常原因,直到找到匹配项或没有原因为止。

要使用此分类器进行重试,可以使用通过构造函数创建的SimpleRetryPolicy,该构造函数采用最大尝试次数,异常映射实例和布尔值(traverseCauses),然后将此策略注入RetryTemplate。

3.3.19 Debugging

Spring AMQP提供了广泛的日志记录,尤其是在DEBUG级别。

如果要监视应用程序和代理之间的AMQP协议,可以使用诸如WireShark之​​类的工具,该工具具有用于解码协议的插件。另外,RabbitMQ Java客户端带有一个非常有用的类,称为Tracer。默认情况下,作为主服务器运行时,它将侦听端口5673并连接到本地主机上的端口5672。您可以运行它并更改连接工厂配置以连接到本地主机上的端口5673。它在控制台上显示解码的协议。有关更多信息,请参考Tracer Javadoc。

其他

从2.2版开始,回调是在连接工厂的执行程序线程之一上调用的,在之前的版本中回调直接在amqp-client连接I / O线程上调用;如果执行某些RPC操作(例如打开新通道),则会死锁,因为I / O线程阻塞了等待结果,但是结果需要由I / O线程本身处理。对于那些版本,有必要将工作(例如发送消息)移交给回调中的另一个线程。由于框架现在将回调调用移交给了执行程序,因此不再需要此操作。