消息队列死信堆积怎么排查和清空

SQS / RabbitMQ / Kafka 的 DLQ 不停涨。通过分类失败、修毒丸消息根因、加退避重试预算来处理。

SQS 死信队列上周还只有 3 条消息,今天就到了 8000。或者 RabbitMQ DLX 在不断积压。或者 Kafka __consumer_offsets retry topic 没边界。每一次激增都对应一类消费者处理不了的消息——schema 不匹配、下游超时、payload 错位、或者逻辑 bug。任 DLQ 静悄悄涨下去,就是在丢业务,还可能违反留存 SLA。修复方向:先分类失败、修毒丸消息根因、加带预算的退避重试。

常见原因

按踩坑频率排序。

1. 生产者和消费者 schema 漂移

生产者加了个新必填字段,消费者解析在旧代码路径上抛异常。消费者没发版之前,每条新消息都失败。

怎么判断:抽几条 DLQ 消息看看。都来自同一个生产者版本,多了一个消费者解析不了的字段。

2. 下游 API 在超时

消费者要扇出到一个第三方 API,对方开始变慢。每条消息 30 秒超时,重试 N 次,进 DLQ。

怎么判断:DLQ 增长跟下游延迟同步。元数据里的失败原因写着 timeout。

3. 单条坏消息毒到整个队列

一条畸形消息让消费者进程崩;消费者重启,又拿到同一条,又崩。死循环。

怎么判断:消费者日志反复崩在同一个 payload hash 上。SQS ApproximateReceiveCount 接近上限。

4. 重试预算太宽,掩盖了真正的失败

maxReceiveCount = 100 意味着每条坏消息要试 100 次才进 DLQ。真问题几天后才被发现。

怎么判断:查队列策略。maxReceiveCount 超过 10 = 预算太宽。

5. DLQ 没监控也没告警

DLQ 已经涨了几周,没人收到 page。等有人注意到,已经积了 5 万条。

怎么判断:CloudWatch / Datadog 没有 DLQ 深度告警。

6. 事故期间消费者被缩容

低 CPU 时自动伸缩把消费者数量降下来;主队列堆积,visibility 超时,消息重入队,最终达到 DLQ 阈值。

怎么判断:DLQ 增长时间戳跟消费者 scale-in 事件吻合。

动手前先确认

  • 把 DLQ 深度和增长率做个快照;抓 10 条样本。
  • 找到 DLQ 归属的消费者服务。
  • 确认处理是否幂等(能安全重放)还是不能(重放有风险)。
  • 重放前按失败类别打标。
  • 需要修 schema 时跟生产者团队同步。

需要收集的信息

  • DLQ 大小、增长速率、最老消息的年龄。
  • 10-20 条样本消息的 body + 元数据(ReceiveCount、ApproximateFirstReceiveTimestamp)。
  • DLQ 开始增长那段时间的消费者日志。
  • 生产者最近的发布。
  • 这段时间下游服务的健康度。

分步修复

Step 1:抽样并分类

# SQS:接收但不删除
aws sqs receive-message \
  --queue-url $DLQ_URL \
  --max-number-of-messages 10 \
  --visibility-timeout 30 \
  --attribute-names All \
  --message-attribute-names All > sample.json

每条样本归类:

  • Schema 不匹配(特定生产者版本、缺字段)
  • 下游超时(错误元数据里有提)
  • JSON / 编码畸形(parse error)
  • 业务逻辑失败(校验拒绝)
  • 未知(读 payload)

每一类修法不同。

Step 2:修 schema 漂移

// 接收时宽容
import { z } from 'zod';

const MessageSchema = z.object({
  id: z.string(),
  userId: z.string(),
  // 新字段,可选 + 合理默认值
  source: z.string().optional().default('unknown'),
  // 旧字段,向后兼容
  type: z.string(),
});

function parse(raw: string) {
  try {
    return MessageSchema.parse(JSON.parse(raw));
  } catch (err) {
    metrics.counter('mq_parse_failure').inc({ reason: err.message });
    throw err;
  }
}

加字段时消费者先发。新字段一开始全部标可选。

Step 3:下游调用加单条超时

async function processMessage(msg: Message) {
  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), 10000);
  
  try {
    await fetch(downstream, { signal: controller.signal });
  } finally {
    clearTimeout(timer);
  }
}

没有单调用超时时,一个慢下游会把整个消费者池堵死。

Step 4:收紧重试预算

# SQS:收紧 redrive 策略
aws sqs set-queue-attributes \
  --queue-url $MAIN_URL \
  --attributes '{
    "RedrivePolicy": "{\"deadLetterTargetArn\":\"<dlq-arn>\",\"maxReceiveCount\":\"5\"}",
    "VisibilityTimeout": "60"
  }'

5 次是常见默认值。失败 5 次后进 DLQ + 告警是合理路径。

RabbitMQ:

channel.assertQueue('main', {
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'failed',
    'x-message-ttl': 300000,
  },
});

Step 5:根因修好后回放安全消息

// SQS:把 DLQ 回放到主队列
import { SQSClient, ReceiveMessageCommand, SendMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';

async function replayDLQ(client: SQSClient, dlqUrl: string, mainUrl: string) {
  while (true) {
    const { Messages } = await client.send(new ReceiveMessageCommand({
      QueueUrl: dlqUrl,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 5,
    }));
    if (!Messages || Messages.length === 0) break;
    
    for (const m of Messages) {
      await client.send(new SendMessageCommand({
        QueueUrl: mainUrl,
        MessageBody: m.Body!,
        MessageAttributes: m.MessageAttributes,
      }));
      await client.send(new DeleteMessageCommand({
        QueueUrl: dlqUrl,
        ReceiptHandle: m.ReceiptHandle!,
      }));
    }
  }
}

确认根因修好之后再回放。先抽小样验证。

Step 6:把毒丸消息隔离

// 跟踪接收次数,超过 N 次进永久隔离
if (msg.attributes.ApproximateReceiveCount > 3) {
  await sendToQuarantine(msg);
  return;
}

隔离队列不自动重试。运维人工看再决定。

Step 7:加 DLQ 深度告警

# CloudWatch alarm
alarm_name: dlq-depth-high
metric: ApproximateNumberOfMessagesVisible
queue: my-service-dlq
threshold: 10
period: 300
comparison: GreaterThanThreshold

DLQ 持续 5 分钟超过 10 条 = page on-call。

验证

  • 回放后 DLQ 深度回到基线(一般是 0)。
  • 24 小时内消费者错误率稳在 0.1% 以下。
  • 抽 10 条成功消息,确认解析路径对新旧 schema 都有效。
  • 下游延迟 p99 稳在超时阈值之内。

长期预防

  • 生产环境每个队列都必须有 DLQ 告警。
  • 消费者 schema 默认可选字段;生产者后发。
  • 全公司统一重试预算:3-5 次进 DLQ。
  • 季度复盘 DLQ:任何 DLQ 大于 0 都要追查。
  • 处理默认幂等;回放要安全。

容易踩的坑

  • maxReceiveCount 来”争取时间”——只是延后看到真问题。
  • 没修根因就回放 DLQ——同样的消息立刻回来。
  • 没有幂等 key,回放会重复处理。
  • DLQ 一直晾到季度末;消息过期就丢了。

FAQ

看不懂的 DLQ 消息能直接删吗? 不能。挪到隔离队列里慢慢看。删了就丢信号。

DLQ 多久排一次? 理想是从不——目标是 DLQ 永远为 0。一涨就当天修。

能不能让 DLQ 自动回放? 谨慎情况下可以:只针对已经根因清楚的失败类别(比如下游故障已经恢复)。未知失败永远别自动回放。

相关阅读

标签: #后端 #排查 #message-queue