一个 topic 每秒生产 5 万条消息。你的 consumer group 之前还跟得上,流量翻倍之后 lag 就开始爬,你把 consumer Deployment 从 8 个 pod 扩到 24 个。lag 还是涨,部分 pod 现在 CPU 在 0%。Kafka UI 显示 group 是 Stable,但从 produce 到 commit 的端到端延迟从 200 ms 涨到 14 分钟,还在涨。
加 consumer 这个直觉,只在 consumer 是 CPU bound 而且没吃满的时候才对。其他情况下基本都是错的方向。这篇讲怎么找到真正的瓶颈、怎么区分。
常见原因
按命中率从高到低排。
1. consumer 数比 partition 数还多
一个 partition 在同一个 group 内只能被一个 consumer 消费。topic 有 12 个 partition,你开 24 个 consumer,那有 12 个就是闲着。继续加完全没意义。
怎么发现:kafka-consumer-groups.sh --describe --group orders-consumer 看 partition、current-offset、consumer-id。如果好几个 consumer-id 没分到 partition,瓶颈就是 partition 数。
2. 有一条 poison message 把某个 partition 卡住
某个 partition 里有一条消息处理失败,consumer 就一直重试它(或者长 backoff 后重试),永远不 commit 越过它。后面所有消息都在排队。其他 partition 看着都正常,只有这一个堵了。
怎么发现:group 的 lag 集中在 1-2 个 partition 上,其他都是 0。日志里同一个 offset 在反复重试。
3. commit 之前要等慢下游写完
consumer 拉得很快,但每条消息处理都要同步写一个慢下游(Postgres insert、外部 API、embedding 模型调用)。端到端吞吐等于下游吞吐,跟 Kafka 没关系。consumer pod CPU 一直很低。
怎么发现:consumer CPU 不到 30%,下游服务的 p99 远高于你这一条消息的预算(大概是 messages_per_second / num_partitions)。
4. rebalance 在反复抖
每次扩容或者 pod 重启,Kafka 都会暂停整个 group、重新分配 partition、再恢复。如果 max.poll.interval.ms 太短或者消息处理特别慢,rebalance 就会不停触发。group 大部分时间在 rebalance 而不是在消费。
怎么发现:consumer 日志里反复出现 Attempt to heartbeat failed 或 Member ... sending LeaveGroup。kafka-consumer-groups.sh 看 group 状态在 Stable 和 PreparingRebalance 之间反复横跳。
5. max.poll.records 调得太高
一次 poll 5000 条,处理要 90 秒。这 90 秒内一次心跳都没发,broker 把这个 consumer 踢出去,group rebalance,这批活又得重来。lag 上下剧烈震荡。
怎么发现:lag 一会儿掉一会儿冲,反复。日志里 Auto-offset-commit failed 或者 This consumer instance is no longer part of the group。
6. producer 倾斜——所有消息都落在一个 partition
producer 用的 key hash 到了少数几个 partition,或者完全没 key、配 sticky partitioner 又赶上突发流量。某一个 partition 拿到 80% 的流量。无论开多少 consumer,那个 partition 只能被其中一个吃。
怎么发现:kafka-topics.sh --describe --topic orders 加上各 partition 的 produce 监控。如果一个 partition 的 produce 速率是其他的 10 倍,就是倾斜。
7. producer 和 consumer 的压缩配置不匹配
producer 用 zstd 发,consumer 的 fetch.max.bytes 太小,一次装不下一个完整的解压 batch。结果就是 consumer 拉一小撮、解压、处理、再拉,吞吐崩盘。
怎么发现:consumer 这边的网络吞吐远低于 partition 实际生产的量,CPU 大头都在解压上。
最短修复路径
第 1 步:先量清楚 lag 到底在哪儿
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group orders-consumer
重点看每个 partition 的 LAG 那一列。如果 99% 的 lag 都在一个 partition 上,那就是 poison message 或者 producer 倾斜。如果 lag 在所有 partition 上均匀分布,那是吞吐问题。
第 2 步:对比一下 partition 数和 consumer 数
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic orders
如果 partition < consumer,再扩 consumer 是浪费。先加 partition:
kafka-topics.sh --bootstrap-server kafka:9092 \
--alter --topic orders --partitions 48
partition 数只能加不能减。挑一个能给你 2-4 倍扩容余量的数。
第 3 步:用 dead letter 模式处理 poison message
在 consumer 里给每条消息设一个重试预算。失败 N 次之后扔到 DLQ topic,然后 commit 往前走。
try {
await processMessage(message);
} catch (err) {
const attempts = (message.headers?.attempts ?? 0) + 1;
if (attempts >= 3) {
await producer.send({
topic: 'orders.dlq',
messages: [{ ...message, headers: { ...message.headers, attempts, lastError: err.message } }]
});
} else {
throw err; // 下一次 poll 会重试
}
}
await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]);
绝对不要让单条坏消息永远堵住一个 partition。
第 4 步:max.poll.records 和 max.poll.interval.ms 一起调
关系是:max.poll.records * 单条平均处理时间 < max.poll.interval.ms。
max.poll.records: 500
max.poll.interval.ms: 300000 # 5 分钟
session.timeout.ms: 45000
heartbeat.interval.ms: 3000
batch 小一点 commit 更勤,handler 慢也不会被踢。
第 5 步:如果是下游瓶颈,就 batch 写
不要每条消息一次 DB insert,攒 200 条做一次 bulk insert。bulk insert 成功之后再 commit。
const batch = [];
for await (const message of consumer) {
batch.push(message);
if (batch.length >= 200) {
await db.bulkInsert(batch.map(parse));
await consumer.commitOffsets(lastOffsetFor(batch));
batch.length = 0;
}
}
这一步通常是收益最大的一步。
第 6 步:换更好的 partition key 修 producer 倾斜
如果你的 key 是 userId,而 0.1% 的用户产生了 50% 的事件,那 partition 永远会倾斜。要么换 key(event id,或者能把负载摊开的复合 key),要么把热用户显式 shard。
const key = isHotUser(userId) ? `${userId}:${randomShard()}` : userId;
producer.send({ topic, messages: [{ key, value }] });
第 7 步:换 cooperative-sticky assignor 减轻 rebalance
默认的 range 或 round-robin assignor 一 rebalance 就 stop-the-world。cooperative-sticky 会尽量保留原有的分配。
partition.assignment.strategy: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
扩缩容不再让整个 group 停摆。
这种情况不怪你
broker 端的限流会卡住整个 consumer group,不管你客户端怎么调。如果集群配了 per-client 的 consumer_byte_rate quota 并且你撞上了,再怎么调 consumer 都没用。看 kafka.server:type=Fetch 和 kafka.server:type=ClientQuotaManager 的 JMX 指标,或者问下管集群的人。
集群本身资源不够也是真实原因:broker 磁盘或者网络打满,fetch 就慢,跟你 consumer 数没关系。
容易被误诊成
“consumer 不够多”。前三次扩容这话是对的。再往后基本就是撞到 partition 上限了,或者瓶颈已经移到下游。扩 pod 之前先看每个 partition 的 lag 分布。
另一个常见的:怪 Kafka 自己慢。Kafka broker 在便宜硬件上每秒能扛几百万条。你这边吞吐才几万还撑不住,瓶颈几乎一定在 consumer 代码或者下游服务。
预防
- 一开始就按最大可能的 consumer 数 × 2-4 倍来规划 partition 数。后期加 partition 会破坏在途数据的 key 序。
- DLQ 从第一天就接上。poison message 一定会出现。
- 监控按 partition 分的 lag,不要只看 group 总 lag。平均值会把 bug 藏起来。
- 新建的 consumer group 默认用 cooperative-sticky assignor。
- consumer CPU 和下游写延迟当成两个独立信号看;lag 涨但 CPU 没涨,瓶颈在下游。
FAQ
- 能减少 partition 数来修倾斜吗? 不行。partition 数只能加。要修就只能新建 topic 然后迁移。
- 每个 consumer 都同步 commit 吗? 同步 commit 更安全但更慢。常见做法是异步 commit + 定期 sync flush。
Related
- 消息队列 dead letter 堆积
- RabbitMQ consumer 卡住
- GraphQL rate limit 级联
- Postgres connection pool 被打爆
- Postgres autovacuum 被长事务卡住
- Redis cluster failover 卡住
- Docker OOM killed
- gRPC deadline exceeded
- Cron job 静默跳过
- Webhook 不触发
标签: #后端 #排查 #infra #kafka #messaging #consumer-lag #streaming