RabbitMQ 控制台里消费者连接是绿的、消息也在投递,但队列深度一直涨、下游啥都没在做。最常见的原因是消息卡在了”unacked”桶里——消费者拉了 prefetch_count 条没 ack,broker 就不再投了。另一种常见情况是消费者拉了消息就崩、没 nack,又没配死信,于是消息一遍遍重投给同一个会崩的消费者。修法:显式 basic_qos、配死信交换机、告警盯”unacked”而不是”ready”深度。
常见原因
按踩坑频率排序。
1. Prefetch 被未 ack 消息占满
消费者要了 prefetch_count = 100,处理得慢。100 条 unacked 全堆着等它消化一条。broker 不再投了。
怎么判断:RabbitMQ UI 里队列 Unacked 列基本等于消费者的 prefetch 总和;Ready 还在涨。
2. 消费者拉了消息后崩掉
worker 拉了消息、panic 或者 OOM 被 kill,没发 basic_nack。RabbitMQ 把它算 unacked,等通道关了再投给下一个 worker,下一个也崩——毒丸消息死循环。
怎么判断:每条消息都 redelivered=True。每次投递 CPU 都飙一下。没有进度。
3. 关掉了手动 ack——auto_ack 把失败吞了
auto_ack=True(也就是 no-ack 模式)一投递就 ack 掉。处理失败消息也没了。队列深度看着没事、数据其实悄悄丢了。
怎么判断:unacked 不涨、broker 没报错,但消费方反馈数据莫名其妙不全。
4. 单线程消费者却用高 prefetch
Python 单线程消费者配 prefetch=200,永远一次只处理一条。剩下 199 条占着位子。
怎么判断:worker CPU 接近 0,处理速率等于慢任务一条一条来。
5. broker flow control 暂停了通道
内存或磁盘水位超了,broker 暂停发布和消费。消费者看着是连着的、消息就是不流。
怎么判断:rabbitmqctl list_connections name state 显示 flow,或者管理 UI 显示节点 memory/disk alarm。
最短修复路径
Step 1: 显式设 prefetch
prefetch 跟实际并发挂钩。常用规则:prefetch = 并发 * 2。
Python(pika):
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.basic_qos(prefetch_count=10) # 按通道
def on_message(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_consume(queue='jobs', on_message_callback=on_message, auto_ack=False)
channel.start_consuming()
Node(amqplib):
await channel.prefetch(10);
await channel.consume('jobs', async (msg) => {
try {
await process(msg.content);
channel.ack(msg);
} catch (e) {
channel.nack(msg, false, false); // 进死信、不 requeue
}
}, { noAck: false });
Step 2: 配死信交换机
毒丸消息得有去处。让工作队列在 nack/reject 时进死信。
# 声明 DLX 和 DLQ
rabbitmqadmin declare exchange name=jobs.dlx type=fanout
rabbitmqadmin declare queue name=jobs.dlq
rabbitmqadmin declare binding source=jobs.dlx destination=jobs.dlq
# 主队列声明带 DLX
rabbitmqadmin declare queue name=jobs arguments='{"x-dead-letter-exchange":"jobs.dlx","x-delivery-limit":5}'
x-delivery-limit(RabbitMQ 3.10+ quorum queue 支持)限制重投次数。失败 5 次进 DLQ,毒丸死循环到此为止。
经典队列里得在应用代码里做:读 x-death header 计数,超过 N 次就 basic_nack(requeue=False)。
Step 3: 告警盯 unacked、不是 ready
rabbitmqctl list_queues name messages_ready messages_unacknowledged consumers
Prometheus exporter 规则:
- alert: RabbitMQUnackedHigh
expr: rabbitmq_queue_messages_unacked{queue="jobs"} > 50 and rate(rabbitmq_queue_messages_published_total[5m]) > 0
for: 10m
labels:
severity: warning
- alert: RabbitMQQueueDepthGrowing
expr: rate(rabbitmq_queue_messages_ready[10m]) > 0 and rabbitmq_queue_consumers > 0
for: 15m
Step 4: 看看 broker 是不是在 flow control
rabbitmqctl list_connections name state user
rabbitmqctl status | grep -E 'mem|disk'
state 是 flow 说明 broker 在限流。提水位或者削峰。
# 看 alarm
rabbitmqctl list_node_alarms
Step 5: 用 quorum queue 提稳定性
经典 mirrored queue 已经被弃用;quorum queue 在重投和持久化上行为更可预测。
rabbitmqadmin declare queue name=jobs durable=true arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"jobs.dlx","x-delivery-limit":5}'
3.8 起支持,写这篇时 3.13 是 stable。
预防
- 永远
auto_ack=false;成功才 ack,失败 nack 到 DLQ。 prefetch_count= worker 并发 * 2,永远别用默认。- 每个工作队列都配死信和投递上限。
- 告警盯
messages_unacknowledged和重投模式。 - 新建工作队列优先 quorum queue。