RabbitMQ高级学习笔记

学习时间:2024年7月30日

1 生产者的可靠性

消息丢失的可能性:

image-20240716144652512

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

1.1 重试机制

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

修改publisher模块的application.yaml文件,添加下面的内容:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

利用命令停掉RabbitMQ服务后,测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

1.2 确认机制

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息(Publish Confirm),代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功(Publish Confirm
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功(Publish Confirm
  • 其它情况都会返回NACK,告知投递失败(Publish Confirm

image-20240806112537587

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

1.3 确认机制实现

1.3.1 开启生产者确认

在publisher模块的application.yaml中添加配置:

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

1.3.2 定义ReturnCallback

暂略

1.3.3 定义ConfirmCallback

暂略

2 MQ的可靠性

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

2.1 数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

2.1.1 交换机持久化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

image-20240806112209885

设置为Durable就是持久化模式,Transient就是临时模式。

2.1.2 队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

image-20240806112232334

2.1.3 消息持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

image-20240806112252451

2.2 Lazy Queue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOutPageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

基于注解来声明队列并设置为Lazy模式:

1
2
3
4
5
6
7
8
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

3 消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

3.1 确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack;当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack

3.2 重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

配置:

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

4 延迟消息

延迟消息(Delayed Message)是一种机制,使得消息不会立即被消费者接收到,而是经过一段指定的延迟时间后才会被投递。实现延迟消息的常见方法有两种:利用TTL(Time-To-Live)和死信队列(DLX,Dead-Letter Exchange)组合,或者使用RabbitMQ的延迟消息插件(rabbitmq_delayed_message_exchange)。

4.1 TTL和DLX

数据流方向:

1
Producer --> 普通交换机 --> 普通队列 --超时-> 死信交换机 --> 死信队列 --> Consumer
  • 定义一个普通交换机和队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class MqConfig {
// 普通交换机
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normal_exchange");
}

// 普通队列,设置TTL和DLX
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("normal_queue")
.withArgument("x-dead-letter-exchange", "dlx_exchange") // 设置死信交换机
.withArgument("x-dead-letter-routing-key", "dlx_routing_key") // 设置死信路由键
.withArgument("x-message-ttl", 60000) // 设置消息的TTL为60秒
.build();
}

// 绑定普通队列和交换机
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_key");
}
}
  • 定义一个死信交换机和队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class DlxConfig {
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx_exchange");
}

// 死信队列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("dlx_queue").build();
}

// 绑定死信队列和交换机
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key");
}
}
  • 发送消息到普通队列
1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
public class Producer {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostMapping("/send")
public void sendMessage() {
String message = "Hello, delayed message!";
rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message);
}

}
  • 死信队列中接收消息:
1
2
3
4
5
6
7
8
9
@Component
public class Consumer {

@RabbitListener(queues = "dlx_queue")
public void listenToSimpleQueue(String msg) {
System.out.println("接收到消息: " + msg);
}

}

4.2 延迟消息插件

4.3 应用场景示例

假设我们有一个在线购物平台,当用户下单后,系统会生成一个订单。如果用户在30分钟内没有完成支付,系统会自动取消该订单。

image-20240806140638225

使用TTL和DLX实现订单超时取消:

  • 定义普通交换机、队列并设置TTL和DLX
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 普通交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order_exchange");
}

// 普通队列,设置TTL和DLX
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order_queue")
.withArgument("x-dead-letter-exchange", "dlx_exchange") // 设置死信交换机
.withArgument("x-dead-letter-routing-key", "order_dlx_routing_key") // 设置死信路由键
.withArgument("x-message-ttl", 1800000) // 设置消息的TTL为30分钟
.build();
}

// 绑定普通队列和交换机
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order_routing_key");
}
  • 定义死信交换机和队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx_exchange");
}

// 死信队列
@Bean
public Queue orderDlxQueue() {
return QueueBuilder.durable("order_dlx_queue").build();
}

// 绑定死信队列和交换机
@Bean
public Binding orderDlxBinding() {
return BindingBuilder.bind(orderDlxQueue()).to(dlxExchange()).with("order_dlx_routing_key");
}
  • 发送订单消息到普通队列
1
2
3
4
5
6
7
@Autowired
private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {
// 发送订单消息到普通队列
rabbitTemplate.convertAndSend("order_exchange", "order_routing_key", order);
}
  • 处理死信队列中的消息
1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "order_dlx_queue")
public void handleExpiredOrder(Order order) {
// 处理超时未支付的订单,例如取消订单
cancelOrder(order);
}

private void cancelOrder(Order order) {
// 取消订单的逻辑
System.out.println("Order " + order.getId() + " has been canceled due to timeout.");
}