Kafka消息丢失问题排查
问题现象
在机器学习模型部署后,发现生产环境中的预测结果出现数据不一致问题,经排查发现部分训练样本在Kafka队列中丢失。
排查步骤
- 确认消费者组状态:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group model-processor
查看consumer lag是否异常,发现部分分区lag持续增长。
- 检查消息积压:
from kafka import KafkaConsumer
consumer = KafkaConsumer('model-input', bootstrap_servers='localhost:9092')
for message in consumer:
print(f"Offset: {message.offset}, Value: {message.value}")
- 验证生产者确认机制:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
acks='all',
retries=3,
compression_type='gzip'
)
关键监控指标
- 消息丢失率:通过对比生产者发送数量与消费者接收数量计算
- 分区lag:当lag超过1000时触发告警
- 生产者确认延迟:平均ack时间超过500ms告警
告警配置方案
在Prometheus中配置以下规则:
- name: kafka-message-loss
rules:
- alert: HighMessageLossRate
expr: rate(kafka_producer_record_send_total[1m]) - rate(kafka_consumer_fetch_total[1m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka消息丢失率过高"
根本原因
通过监控发现,当Kafka集群负载过高时,消费者处理速度跟不上生产者写入速度,导致部分消息在缓冲区超时丢失。建议优化消费者批量处理逻辑并增加集群资源。

讨论