Kafka consumer lag 一直涨,扩 consumer 也没用

扩了一堆 consumer pod,lag 还在往上走。瓶颈基本不是 consumer 不够多,而是 partition 数、poison message、或者 commit offset 漂移。

一个 topic 每秒生产 5 万条消息。你的 consumer group 之前还跟得上,流量翻倍之后 lag 就开始爬,你把 consumer Deployment 从 8 个 pod 扩到 24 个。lag 还是涨,部分 pod 现在 CPU 在 0%。Kafka UI 显示 group 是 Stable,但从 produce 到 commit 的端到端延迟从 200 ms 涨到 14 分钟,还在涨。

加 consumer 这个直觉,只在 consumer 是 CPU bound 而且没吃满的时候才对。其他情况下基本都是错的方向。这篇讲怎么找到真正的瓶颈、怎么区分。

常见原因

按命中率从高到低排。

1. consumer 数比 partition 数还多

一个 partition 在同一个 group 内只能被一个 consumer 消费。topic 有 12 个 partition,你开 24 个 consumer,那有 12 个就是闲着。继续加完全没意义。

怎么发现kafka-consumer-groups.sh --describe --group orders-consumer 看 partition、current-offset、consumer-id。如果好几个 consumer-id 没分到 partition,瓶颈就是 partition 数。

2. 有一条 poison message 把某个 partition 卡住

某个 partition 里有一条消息处理失败,consumer 就一直重试它(或者长 backoff 后重试),永远不 commit 越过它。后面所有消息都在排队。其他 partition 看着都正常,只有这一个堵了。

怎么发现:group 的 lag 集中在 1-2 个 partition 上,其他都是 0。日志里同一个 offset 在反复重试。

3. commit 之前要等慢下游写完

consumer 拉得很快,但每条消息处理都要同步写一个慢下游(Postgres insert、外部 API、embedding 模型调用)。端到端吞吐等于下游吞吐,跟 Kafka 没关系。consumer pod CPU 一直很低。

怎么发现:consumer CPU 不到 30%,下游服务的 p99 远高于你这一条消息的预算(大概是 messages_per_second / num_partitions)。

4. rebalance 在反复抖

每次扩容或者 pod 重启,Kafka 都会暂停整个 group、重新分配 partition、再恢复。如果 max.poll.interval.ms 太短或者消息处理特别慢,rebalance 就会不停触发。group 大部分时间在 rebalance 而不是在消费。

怎么发现:consumer 日志里反复出现 Attempt to heartbeat failedMember ... sending LeaveGroupkafka-consumer-groups.sh 看 group 状态在 StablePreparingRebalance 之间反复横跳。

5. max.poll.records 调得太高

一次 poll 5000 条,处理要 90 秒。这 90 秒内一次心跳都没发,broker 把这个 consumer 踢出去,group rebalance,这批活又得重来。lag 上下剧烈震荡。

怎么发现:lag 一会儿掉一会儿冲,反复。日志里 Auto-offset-commit failed 或者 This consumer instance is no longer part of the group

6. producer 倾斜——所有消息都落在一个 partition

producer 用的 key hash 到了少数几个 partition,或者完全没 key、配 sticky partitioner 又赶上突发流量。某一个 partition 拿到 80% 的流量。无论开多少 consumer,那个 partition 只能被其中一个吃。

怎么发现kafka-topics.sh --describe --topic orders 加上各 partition 的 produce 监控。如果一个 partition 的 produce 速率是其他的 10 倍,就是倾斜。

7. producer 和 consumer 的压缩配置不匹配

producer 用 zstd 发,consumer 的 fetch.max.bytes 太小,一次装不下一个完整的解压 batch。结果就是 consumer 拉一小撮、解压、处理、再拉,吞吐崩盘。

怎么发现:consumer 这边的网络吞吐远低于 partition 实际生产的量,CPU 大头都在解压上。

最短修复路径

第 1 步:先量清楚 lag 到底在哪儿

kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group orders-consumer

重点看每个 partition 的 LAG 那一列。如果 99% 的 lag 都在一个 partition 上,那就是 poison message 或者 producer 倾斜。如果 lag 在所有 partition 上均匀分布,那是吞吐问题。

第 2 步:对比一下 partition 数和 consumer 数

kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic orders

如果 partition < consumer,再扩 consumer 是浪费。先加 partition:

kafka-topics.sh --bootstrap-server kafka:9092 \
  --alter --topic orders --partitions 48

partition 数只能加不能减。挑一个能给你 2-4 倍扩容余量的数。

第 3 步:用 dead letter 模式处理 poison message

在 consumer 里给每条消息设一个重试预算。失败 N 次之后扔到 DLQ topic,然后 commit 往前走。

try {
  await processMessage(message);
} catch (err) {
  const attempts = (message.headers?.attempts ?? 0) + 1;
  if (attempts >= 3) {
    await producer.send({
      topic: 'orders.dlq',
      messages: [{ ...message, headers: { ...message.headers, attempts, lastError: err.message } }]
    });
  } else {
    throw err;  // 下一次 poll 会重试
  }
}
await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]);

绝对不要让单条坏消息永远堵住一个 partition。

第 4 步:max.poll.recordsmax.poll.interval.ms 一起调

关系是:max.poll.records * 单条平均处理时间 < max.poll.interval.ms

max.poll.records: 500
max.poll.interval.ms: 300000   # 5 分钟
session.timeout.ms: 45000
heartbeat.interval.ms: 3000

batch 小一点 commit 更勤,handler 慢也不会被踢。

第 5 步:如果是下游瓶颈,就 batch 写

不要每条消息一次 DB insert,攒 200 条做一次 bulk insert。bulk insert 成功之后再 commit。

const batch = [];
for await (const message of consumer) {
  batch.push(message);
  if (batch.length >= 200) {
    await db.bulkInsert(batch.map(parse));
    await consumer.commitOffsets(lastOffsetFor(batch));
    batch.length = 0;
  }
}

这一步通常是收益最大的一步。

第 6 步:换更好的 partition key 修 producer 倾斜

如果你的 key 是 userId,而 0.1% 的用户产生了 50% 的事件,那 partition 永远会倾斜。要么换 key(event id,或者能把负载摊开的复合 key),要么把热用户显式 shard。

const key = isHotUser(userId) ? `${userId}:${randomShard()}` : userId;
producer.send({ topic, messages: [{ key, value }] });

第 7 步:换 cooperative-sticky assignor 减轻 rebalance

默认的 range 或 round-robin assignor 一 rebalance 就 stop-the-world。cooperative-sticky 会尽量保留原有的分配。

partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor

扩缩容不再让整个 group 停摆。

这种情况不怪你

broker 端的限流会卡住整个 consumer group,不管你客户端怎么调。如果集群配了 per-client 的 consumer_byte_rate quota 并且你撞上了,再怎么调 consumer 都没用。看 kafka.server:type=Fetchkafka.server:type=ClientQuotaManager 的 JMX 指标,或者问下管集群的人。

集群本身资源不够也是真实原因:broker 磁盘或者网络打满,fetch 就慢,跟你 consumer 数没关系。

容易被误诊成

“consumer 不够多”。前三次扩容这话是对的。再往后基本就是撞到 partition 上限了,或者瓶颈已经移到下游。扩 pod 之前先看每个 partition 的 lag 分布。

另一个常见的:怪 Kafka 自己慢。Kafka broker 在便宜硬件上每秒能扛几百万条。你这边吞吐才几万还撑不住,瓶颈几乎一定在 consumer 代码或者下游服务。

预防

  • 一开始就按最大可能的 consumer 数 × 2-4 倍来规划 partition 数。后期加 partition 会破坏在途数据的 key 序。
  • DLQ 从第一天就接上。poison message 一定会出现。
  • 监控按 partition 分的 lag,不要只看 group 总 lag。平均值会把 bug 藏起来。
  • 新建的 consumer group 默认用 cooperative-sticky assignor。
  • consumer CPU 和下游写延迟当成两个独立信号看;lag 涨但 CPU 没涨,瓶颈在下游。

FAQ

  • 能减少 partition 数来修倾斜吗? 不行。partition 数只能加。要修就只能新建 topic 然后迁移。
  • 每个 consumer 都同步 commit 吗? 同步 commit 更安全但更慢。常见做法是异步 commit + 定期 sync flush。

标签: #后端 #排查 #infra #kafka #messaging #consumer-lag #streaming