消息应答-不公平分发
2025-4-20
| 2025-4-20
Words 1267Read Time 4 min
type
status
date
slug
summary
tags
category
icon
password
消息处理就会有消息丢失的可能,如果工作线程没有解决,MQ 又把消息删除了,那么消息就丢失了。所以引入了消息应答的机制,如果一个工作线程对一个消息处理完成了,那么他需要告诉 mq 他已经处理了,mq 就可以把消息删除了。

自动应答

消息被发送出去后立即被认为已经传送成功了,这种模式需要在高吞吐量和数据传输安全性方面做出权衡,因为这种模式如果在消息接收到之前,连接通道被关闭了,那么消息就被丢失了。或者工作线程的服务器因为过载,导致内存耗尽,也会使得消息的丢失,所以这种自动应答的方式适合良好的环境,不适合极端环境的情况。

手动应答

消息应答的方法
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数,批量处理,建议不批量应答,只处理当前应答的那个消息。
不处理该消息了直接拒绝,可以将其丢弃了
防止消息丢失
消息自动重新入队
如果消息没有被处理成功。那么此消息会被重新入队。

演示消息处理异常,消息不会丢失

消息在手动应答时是不丢失的,放回队列中,重新消费。
原理:生产者发送消息,两个消费者消费消息,一个消费者睡眠 10 秒,一个消费者睡眠 30 秒,当睡眠 30 秒的工作线程在处理消息的时候突然终止了,那么会不会把消息分配给另一个消费者消费。 生产者

消费者

RabitMQ 持久化

队列持久化

只需要在声明队列的时候,把持久化的队列持久化即可,但如果该队列已经被设置为不持久化了,需要先删除之后再重新进行创建队列。
mq 后台管理可以看到
notion image
image-20230614105203342

消息持久化

队列持久化是在声明队列的时候初始化队列为持久化,那么消息持久化就是在生产者发消息的时候就通知队列该消息是持久化的。

不公平分发

默认情况下消费者是轮询的方式消费消息,但如果一个消费者消费时间较长,一个较短,采用轮询的方式就会导致一个消费者长期出于空闲,一个一直在处理消息。所以建议设置为不公平分发,能者多劳。
消费者端设置
预取值
预取值是和上面的prefetchCount有关系的,设置为 0 代表轮询,1代表不公平,还可以设置某个消费者可以预先拿到多少的消息。比如设置 5,就是信道中可以预先拿到 5 条消息。
满减活动消息发布确认
Loading...