RabbitMQ 消费者连着却不消费、队列一直涨

RabbitMQ 显示消费者健康连接、队列却越堆越多。修 prefetch、unacked 消息和死信路由,把卡住的消费者救出来。

RabbitMQ 控制台里消费者连接是绿的、消息也在投递,但队列深度一直涨、下游啥都没在做。最常见的原因是消息卡在了”unacked”桶里——消费者拉了 prefetch_count 条没 ack,broker 就不再投了。另一种常见情况是消费者拉了消息就崩、没 nack,又没配死信,于是消息一遍遍重投给同一个会崩的消费者。修法:显式 basic_qos、配死信交换机、告警盯”unacked”而不是”ready”深度。

常见原因

按踩坑频率排序。

1. Prefetch 被未 ack 消息占满

消费者要了 prefetch_count = 100,处理得慢。100 条 unacked 全堆着等它消化一条。broker 不再投了。

怎么判断:RabbitMQ UI 里队列 Unacked 列基本等于消费者的 prefetch 总和;Ready 还在涨。

2. 消费者拉了消息后崩掉

worker 拉了消息、panic 或者 OOM 被 kill,没发 basic_nack。RabbitMQ 把它算 unacked,等通道关了再投给下一个 worker,下一个也崩——毒丸消息死循环。

怎么判断:每条消息都 redelivered=True。每次投递 CPU 都飙一下。没有进度。

3. 关掉了手动 ack——auto_ack 把失败吞了

auto_ack=True(也就是 no-ack 模式)一投递就 ack 掉。处理失败消息也没了。队列深度看着没事、数据其实悄悄丢了。

怎么判断:unacked 不涨、broker 没报错,但消费方反馈数据莫名其妙不全。

4. 单线程消费者却用高 prefetch

Python 单线程消费者配 prefetch=200,永远一次只处理一条。剩下 199 条占着位子。

怎么判断:worker CPU 接近 0,处理速率等于慢任务一条一条来。

5. broker flow control 暂停了通道

内存或磁盘水位超了,broker 暂停发布和消费。消费者看着是连着的、消息就是不流。

怎么判断rabbitmqctl list_connections name state 显示 flow,或者管理 UI 显示节点 memory/disk alarm。

最短修复路径

Step 1: 显式设 prefetch

prefetch 跟实际并发挂钩。常用规则:prefetch = 并发 * 2

Python(pika):

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.basic_qos(prefetch_count=10)   # 按通道

def on_message(ch, method, properties, body):
    try:
        process(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

channel.basic_consume(queue='jobs', on_message_callback=on_message, auto_ack=False)
channel.start_consuming()

Node(amqplib):

await channel.prefetch(10);
await channel.consume('jobs', async (msg) => {
  try {
    await process(msg.content);
    channel.ack(msg);
  } catch (e) {
    channel.nack(msg, false, false);   // 进死信、不 requeue
  }
}, { noAck: false });

Step 2: 配死信交换机

毒丸消息得有去处。让工作队列在 nack/reject 时进死信。

# 声明 DLX 和 DLQ
rabbitmqadmin declare exchange name=jobs.dlx type=fanout
rabbitmqadmin declare queue name=jobs.dlq
rabbitmqadmin declare binding source=jobs.dlx destination=jobs.dlq

# 主队列声明带 DLX
rabbitmqadmin declare queue name=jobs arguments='{"x-dead-letter-exchange":"jobs.dlx","x-delivery-limit":5}'

x-delivery-limit(RabbitMQ 3.10+ quorum queue 支持)限制重投次数。失败 5 次进 DLQ,毒丸死循环到此为止。

经典队列里得在应用代码里做:读 x-death header 计数,超过 N 次就 basic_nack(requeue=False)

Step 3: 告警盯 unacked、不是 ready

rabbitmqctl list_queues name messages_ready messages_unacknowledged consumers

Prometheus exporter 规则:

- alert: RabbitMQUnackedHigh
  expr: rabbitmq_queue_messages_unacked{queue="jobs"} > 50 and rate(rabbitmq_queue_messages_published_total[5m]) > 0
  for: 10m
  labels:
    severity: warning
- alert: RabbitMQQueueDepthGrowing
  expr: rate(rabbitmq_queue_messages_ready[10m]) > 0 and rabbitmq_queue_consumers > 0
  for: 15m

Step 4: 看看 broker 是不是在 flow control

rabbitmqctl list_connections name state user
rabbitmqctl status | grep -E 'mem|disk'

state 是 flow 说明 broker 在限流。提水位或者削峰。

# 看 alarm
rabbitmqctl list_node_alarms

Step 5: 用 quorum queue 提稳定性

经典 mirrored queue 已经被弃用;quorum queue 在重投和持久化上行为更可预测。

rabbitmqadmin declare queue name=jobs durable=true arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"jobs.dlx","x-delivery-limit":5}'

3.8 起支持,写这篇时 3.13 是 stable。

预防

  • 永远 auto_ack=false;成功才 ack,失败 nack 到 DLQ。
  • prefetch_count = worker 并发 * 2,永远别用默认。
  • 每个工作队列都配死信和投递上限。
  • 告警盯 messages_unacknowledged 和重投模式。
  • 新建工作队列优先 quorum queue。

标签: #后端 #排查 #rabbitmq